# ETL Workflow

## Extract 
Extract data from persistent storage, which is not suited for data processing, into memory.

* Structured
    * Flat file
* Semi-structured
    * JSON
* Unstructured
    * Plain txt
    
You always need a connection string/URL to connect with a database.

In [None]:
# Connection string in python / Creae a database engine
import sqlalchemy
connection_uri = 'postgresql://[user[:password]@][host][:port][/database]'
db_engine = sqlalchemy.create_engine(connection_uri)

import pandas as pd
pd.read_sql("SELECT * FROM table", db_engine)

In [None]:
# Extraction data into PySpark

import pyspark.sql

# get data into the Spark framework
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark.read.jdbc('jdbc:postgresql://[host][:port][/database]'
                , ['table_name']
                , properties = {'user' : 'repl', 'password' : 'password'})

## Transform

* Selection of attributes
* Translation of code values 
    * e.g. New York --> NY
* Data validation
    * e.g. date input at 'created date'
* Splitting columns into multiple columns
* Joining from multiple sources

In [None]:
# EXAMPLE

# Use groupBy and mean to aggregate the column
ratings_per_film_df = rating_df.groupBy('film_id').mean('rating')

# Join the tables using the film_id column
film_df_with_ratings = film_df.join(
    ratings_per_film_df,
    film_df.film_id == ratings_per_film_df.film_id
)

# Show the 5 first results
print(film_df_with_ratings.show(5))

## Load

Analytics
* Aggregate queries
* Online analytisl processing (OLAP)
* Column-oriented
* Queries about subset of columns
* Better at parallelization

Applications
* Lot of transactions
* Online transaction processing (OLTP)
* Row-oriented (date stored per record)
* Added per tranaction
    * eg. adding customer is fast
      
Massively Parallel Processing (MPP) Databases - which are usually column-oriented:
* Azure SQL Data Warehouse
* Google BiqQuery
* Amazon Redshift

In [None]:
# File to the right format
# pandas to_parquet() method

df.to_parquet('url')

In [None]:
# File to the right format
# PySpark .write.parquet() method

df.write.parquet('url')

COPY customer
FROM 'url'
FORMAT as parquet

In [None]:
# Load to PostgreSQL

# Transformation on data
recommendations = transform_find_recommendations(df)

# Load into PostgreSQL database
recommendations.to_sql('tablename'
                       , db_engine
                       , schema = 'store'
                       , if_exists = 'replace') # or append

## Additional

Example about a fetch from an API

In [None]:
import requests

# Fetch the Hackernews post
resp = requests.get("https://hacker-news.firebaseio.com/v0/item/16222426.json")

# Print the response parsed as JSON
print(resp.json())

# Assign the score of the test to post_score
post_score = resp.json()["score"]

Example flow of ELT steps

In [None]:
# EXTRACT
# The connection URI
connection_uri = "postgresql://repl:password@localhost:5432/dwh"
db_engine_dwh = sqlalchemy.create_engine(connection_uri)

# TRANSFORMATION
# Join with recommendations data
film_pdf_joined = film_pdf.join(recommendations)

# LOAD
# .to_sql() to write to store.film
film_pdf_joined.to_sql("film", db_engine_dwh, schema="store", if_exists="replace")

# ADDITIONAL
# Run the query to fetch the data
pd.read_sql(query = "SELECT film_id, recommended_film_ids FROM store.film", db_engine = db_engine_dwh) # params = {'arg':var}