<a href="https://colab.research.google.com/github/SurajKande/Pipelining/blob/master/ETL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#ETL : Extract, Transform and Load

##Extract:

- this means extracting data from Persistent storage, which is not suited for data processing,

- persistent storage could be a file from S3 or a SQL databasse

- It is necessary stage before transforming the data

- The sources to extract from vary
    * we can extract data from text files, example
        - plain text or paragraphs like in textbook
        - or flat files like csv,tsv
        - JSON files : hold infomation in semi structured way
             * we can map JSON objects to python dictionaries using json module 
        - etc. 


In [0]:
# Fetch from an API
import requests

# Fetch the Hackernews post
resp = requests.get("https://hacker-news.firebaseio.com/v0/item/16222426.json")

# Print the response parsed as JSON
print(resp.json())

# Assign the score of the test to post_score
post_score = resp.json()['score']
print(post_score)

{'by': 'neis', 'descendants': 0, 'id': 16222426, 'score': 17, 'time': 1516800333, 'title': 'Duolingo-Style Learning for Data Science: DataCamp for Mobile', 'type': 'story', 'url': 'https://medium.com/datacamp/duolingo-style-learning-for-data-science-datacamp-for-mobile-3861d1bc02df'}
17


In [0]:
# Function to extract table to a pandas DataFrame       
import pandas as pd
from sqlalchemy import create_engine
import sqlalchemy

def extract_table_to_pandas(tablename, db_engine):
    query = "SELECT * FROM {}".format(tablename)
    return pd.read_sql(query, db_engine)

# Connect to the database using the connection URI
connection_uri = "postgresql://repl:password@localhost:5432/pagila"       
db_engine = sqlalchemy.create_engine(connection_uri)

# Extract the film table into a pandas DataFrame
film_dataframe = extract_table_to_pandas("film", db_engine)

# Extract the customer table into a pandas DataFrame
customer_dataframe = extract_table_to_pandas("customer", db_engine)

##Tansform:
   its nonexhaustive list to perform on data during Transform phase some of them are:
   -  Data validation
   -  Translation of code values ( e.g. NEWYORK -->  NY  )
   -  splitting 
   -  joining from various sources
   

In [0]:
# Get the rental rate column as a string
rental_rate_str = film_dataframe.rental_rate.astype(str)

# Split up and expand the column
rental_rate_expanded = rental_rate_str.str.split('.', expand=True)

# Assign the columns to film_df
film_dataframe = film_dataframe.assign(
    rental_rate_dollar =rental_rate_expanded[0],
    rental_rate_cents =rental_rate_expanded[1],
)

In [0]:
query = "SELECT film.film_id, film.rating, customer.customer_id FROM film, customer"
rating_dataframe = pd.read_sql(query,db_engine)

In [0]:
# Use groupBy and mean to aggregate the column
ratings_per_film_dataframe = rating_dataframe.groupBy('film_id').mean('rating')

# Join the tables using the film_id column
film_dataframe_with_ratings = film_dataframe.join(
    ratings_per_film_dataframe,
    film_dataframe.film_id==ratings_per_film_dataframe.film_id
)

# Show the 5 first results
print(film_dataframe_with_ratings.show(5))

## Loading
 To load the extracted and transformed data into analytics database for analyzing the data by data scientists 

 -  files are often loaded into a MPP( massive parallel processing ) database like Redshift in order to make it available for analysis.

 -  typically MPP databases load data best from files that use columnar storage format

 -  we use file format called parquet for this purpose

In [0]:
# Write the pandas DataFrame to parquet
film_dataframe.to_parquet("films_pdf.parquet")

# Write the PySpark DataFrame to parquet
film_sparkdataframe.write.parquet("films_sdf.parquet")

In [0]:
# Finish the connection URI
connection_uri = "postgresql://repl:password@localhost:5432/data_warehouse"
db_engine_data_warehouse = sqlalchemy.create_engine(connection_uri)

# Transformation step, join with recommendations data
film_dataframe_joined = film_dataframe.join(recommendations)

# Finish the .to_sql() call to write to store.film
film_dataframe_joined.to_sql("film",db_engine_data_warehouse, schema="store", if_exists="replace")

# Run the query to fetch the data
pd.read_sql("SELECT film_id, recommended_film_ids FROM store.film", db_engine_data_warehouse)