# Introduction to Data Engineering

## 2 - Data engineering toolbox

#### Database schema


In [2]:
# Complete the SELECT statement
data = pd.read_sql("""
SELECT first_name, last_name FROM "Customer"
ORDER BY last_name, first_name
""", db_engine)

# Show the first 3 rows of the DataFrame
print(data.head(3))

# Show the info of the DataFrame
print(data.info())

NameError: name 'pd' is not defined

#### Joining on relations

In [4]:
# Complete the SELECT statement
data = pd.read_sql("""
SELECT * FROM "Customer"
INNER JOIN "Order"
ON "Order"."customer_id"="Customer"."id"
""", db_engine)

# Show the id column of data
print(data.id)

NameError: name 'pd' is not defined

#### From task to subtasks

In [5]:
# Function to apply a function over multiple cores
@print_timing
def parallel_apply(apply_func, groups, nb_cores):
    with Pool(nb_cores) as p:
        results = p.map(apply_func, groups)
    return pd.concat(results)

# Parallel apply using 1 core
parallel_apply(take_mean_age, athlete_events.groupby('Year'), 1)

# Parallel apply using 2 cores
parallel_apply(take_mean_age, athlete_events.groupby('Year'), 2)

# Parallel apply using 4 cores
parallel_apply(take_mean_age, athlete_events.groupby('Year'), 4)

NameError: name 'print_timing' is not defined

#### Using a DataFrame

In [6]:
import dask.dataframe as dd

# Set the number of pratitions
athlete_events_dask = dd.from_pandas(athlete_events, npartitions = 4)

# Calculate the mean Age per Year
print(athlete_events_dask.groupby('Year').Age.mean().compute())

NameError: name 'athlete_events' is not defined

#### A PySpark groupby

In [7]:
# Print the type of athlete_events_spark
print(type(athlete_events_spark))

# Print the schema of athlete_events_spark
print(athlete_events_spark.printSchema())

# Group by the Year, and find the mean Age
print(athlete_events_spark.groupBy('Year').mean('Age'))

# Group by the Year, and find the mean Age
print(athlete_events_spark.groupBy('Year').mean('Age').show())

NameError: name 'athlete_events_spark' is not defined

#### Running PySpark files

In [None]:
repl:~$ cat /home/repl/spark-script.py
from pyspark.sql import SparkSession


if __name__ == "__main__":
    spark = SparkSession.builder.getOrCreate()
    athlete_events_spark = (spark
        .read
        .csv("/home/repl/datasets/athlete_events.csv",
             header=True,
             inferSchema=True,
             escape='"'))

    athlete_events_spark = (athlete_events_spark
        .withColumn("Height",
                    athlete_events_spark.Height.cast("integer")))

    print(athlete_events_spark
        .groupBy('Year')
        .mean('Height')
        .orderBy('Year')
        .show())

#### Parallel computing



In [8]:
from multiprocessing import Pool
import pandas as pd

def take_mean_age(year_and_group):
    year, group = year_and_group
    return pd.DataFrame({"Age": group["Age"].mean()}, index=[year])
    
# defining 4 as an argument to Pool, the mapping runs in 4 separate processes
# thus uses 4 cores
with Pool(4) as p:
    results = p.map(take_mean_age, athlete_events.groupby("Year"))
    
result_df = pd.concat(results)

NameError: name 'athlete_events' is not defined

#### Parallel computing using Dask framework

In [9]:
import dask.dataframe as dd

# partition dataframe into 4
athlete_events_dask = dd.from_pandas(athlete_events, npartitions = 4)

# run parallel computation on each partition
# dask uses lazy evaluation, you need to add compute() at the end
result_df = athlete_events_dask.groupby('Year').Age.mean().compute()

NameError: name 'athlete_events' is not defined

#### example 1

In [11]:
# Function to apply a funciton over multiple cores
def take_mean_age(year_and_group):
    year, group = year_and_group
    return pd.DataFrame({"Age": group["Age"].mean()}, index=[year])

@print_timing
def parallel_apply(apply_func, groups, nb_cores):
    with Pool(nb_cores) as p:
        results = p.map(apply_func, groups)
    return pd.concat(results)

#parallel apply using 1 core
parallel_apply(take_mean_age, athlete_events.groupby('Year'), 1)

#parallel apply using 2 cores
parallel_apply(take_mean_age, athlete_events.groupby('Year'), 2)

#parallel apply using 4 cores
parallel_apply(take_mean_age, athlete_events.groupby('Year'), 4)

NameError: name 'print_timing' is not defined

#### example 2 (Dask)

In [13]:
import dask.dataframe as dd

#set the number of partitions
athlete_events_dask = dd.from_pandas(athlete_events, npartitions = 4)

#calculate the mean age per year
print(athlete_events_dask.groupby('Year').Age.mean().compute()

SyntaxError: unexpected EOF while parsing (<ipython-input-13-6b2faa29dd8a>, line 7)

#### Airflow DAGs

In [14]:
# Create the DAG object
dag = DAG(dag_id="car_factory_simulation",
          default_args={"owner": "airflow", "start_date": airflow.utils.dates.days_ago(2)},
          schedule_interval="0 * * * *")
          
# Task definitions
assemble_frame = BashOperator(task_id="assemble_frame", 
                              bash_command='echo "Assembling frame"', dag=dag)
place_tires = BashOperator(task_id="place_tires", 
                           bash_command='echo "Placing tires"', dag=dag)
assemble_body = BashOperator(task_id="assemble_body", 
                             bash_command='echo "Assembling body"', dag=dag)
apply_paint = BashOperator(task_id="apply_paint", 
                           bash_command='echo "Applying paint"', dag=dag)

# Complete the downstream flow
assemble_frame.set_downstream(place_tires)
assemble_frame.set_downstream(assemble_body)
assemble_body.set_downstream(apply_paint)

NameError: name 'DAG' is not defined

## 3 - Extract, Transform and Load (ETL)

#### Extract

#### - data on the web through APIs


In [17]:
import requests

reponse = requests.get("https://hacker-news.firebaseio.com/v0/item/16222426.json")
print(reponse.json())

{'by': 'neis', 'descendants': 0, 'id': 16222426, 'score': 17, 'time': 1516800333, 'title': 'Duolingo-Style Learning for Data Science: DataCamp for Mobile', 'type': 'story', 'url': 'https://medium.com/datacamp/duolingo-style-learning-for-data-science-datacamp-for-mobile-3861d1bc02df'}


#### - data in database

In [28]:
# connection string/URI
# postgresql://[user][:password]@[host][:port]

import sqlalchemy

# Connect to the database using the connection URI
connection_uri = "postgresql://repl:password@localhost:5432/pagila" 
db_engine = sqlalchemy.create_engine(connection_uri)

#### example 1 - Fetch from an API

In [25]:
import requests

#Fetch the Hackernews postgresql
resp = requests.get("https://hacker-news.firebaseio.com/v0/item/16222426.json")

# Print the response parsed as JSON
print(resp.json())

# Assign the score of the test to post_score
post_score = resp.json()["score"]
print(post_score)

{'by': 'neis', 'descendants': 0, 'id': 16222426, 'score': 17, 'time': 1516800333, 'title': 'Duolingo-Style Learning for Data Science: DataCamp for Mobile', 'type': 'story', 'url': 'https://medium.com/datacamp/duolingo-style-learning-for-data-science-datacamp-for-mobile-3861d1bc02df'}
17


#### example 2 - Read from a database

In [None]:
import pandas as pd

def extract_table_to_pandas(tablename, db_engine):
    query = "SELECT * FROM {}".format(tablename)
    return pd.read_sql(query, db_engine)
    
# Connect to the database using the connection URI
connection_uri = "postgresql://repl:password@localhost:5432/pagila" 
db_engine = sqlalchemy.create_engine(connection_uri)

# Extract the film table into a pandas DataFrame
extract_table_to_pandas("film", db_engine)

#### Transform


#### Split (Pandas)


In [7]:
import pandas as pd
customer = [{'customer_id':1, 'email': 'jane.doe@theweb.com'}]
customer_df = pd.DataFrame(customer)

# Split email conlumn into 2 columns on the '@' symbol
split_email = customer_df.email.str.split('@', expand =True)

customer_df = customer_df.assign(
    username = split_email[0],
    domain = split_email[1]
)
print(customer_df)

customer_id                email  username      domain
0            1  jane.doe@theweb.com  jane.doe  theweb.com


#### Transforming in PySpark

In [12]:
# Extract data into PySpark

import pyspark.sql

spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark.read.jdbc("jdbc:postgresql://localhost:5432/pagila",
                "customer",
                properties={"user":"repl", "password":"password"})

Py4JJavaError: An error occurred while calling o49.jdbc.
: java.sql.SQLException: No suitable driver
	at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$6.apply(JDBCOptions.scala:105)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$6.apply(JDBCOptions.scala:105)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:104)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:35)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:32)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
	at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:238)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:567)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:830)


#### example 1 - Splitting the rental rate

In [13]:
# Get the rental rate column as a string
rental_rate_str = film_df.rental_rate.astype(str)

# Split up and expand the column
rental_rate_expanded = rental_rate_str.str.split('.', expand=True)

# Assign the columns to film_df
film_df = film_df.assign(
    rental_rate_dollar=rental_rate_expanded[0],
    rental_rate_cents=rental_rate_expanded[1],
)

NameError: name 'film_df' is not defined

#### example 2 - Joining with ratings


In [14]:
# Use groupBy and mean to aggregate the column
ratings_per_film_df = rating_df.groupBy('film_id').mean('rating')

# Join the tables using the film_id column
film_df_with_ratings = film_df.join(
    ratings_per_film_df,
    film_df.film_id==ratings_per_film_df.film_id
)

# Show the 5 first results
print(film_df_with_ratings.show(5))

NameError: name 'rating_df' is not defined

#### Load

Analytics (OLAP)
- column-oriented
- queries about subset of columns

Application databases (OLTP)
- stored per record (row-oriented)
- added per transaction
- e.g. adding customer is fast

Column-oriented databases also are better to paralleization
MPP databases (Massively Parallel Processing Databases)
- e.g. Amazon Redshift, Azuer SQL Data Warehouse, Google BigQuery

#### Writing to a file (parquet format)

In [None]:
# Write the pandas DataFrame to parquet
film_pdf.to_parquet('films_pdf.parquet')

# Write the PySpark DataFrame to parquet
film_sdf.write.parquet('films_sdf.parquet')

#### Load into Postgres

In [None]:
# Structure of connection URI for sqlalchemy
# postgresql://[user[:password]@][host][:port][/database]

# Finish the connection URI
connection_uri = "postgresql://repl:password@localhost:5432/dwh"
db_engine_dwh = sqlalchemy.create_engine(connection_uri)

# Transformation step, join with recommendations data
film_pdf_joined = film_pdf.join(recommendations)

# Finish the .to_sql() call to write to store.film
film_pdf_joined.to_sql("film", db_engine_dwh, schema="store", if_exists="replace")

# Run the query to fetch the data
pd.read_sql("SELECT film_id, recommended_film_ids FROM store.film", db_engine_dwh)

#### Putting it all together 

The ETL function

def extract_table_do_df(tablename, db_engine)

def split_columns_transform(df, columnm pat, suffixes)
(note:converts column into str and splits it on pat..)

def load_df_into_dwh(film_df, tablename, schema, db_engine)

def etl():

\# Extract

film_df = extract_table_do_df('film', db_engines['store'])

\# Transform 

film_df = split_columns_transform(film_df, 'rental_rate', '.', \['_dollar', '_cents'\])

\# Load 

load_df_into_dwh(film_df, 'film', 'store', db_engines['dwh'])


#### Defining a DAG

- Extract: Extract the `film` PostgreSQL table into `pandas`
- Transform: Split the `rental_rate` column of the `film` DataFrame
- Load: Load the `film` DataFrame into a PostgreSQL data warehouse

Add an ETL task to an existing DAG. The DAG to extentd and the task to wait for are defined.

In [15]:
# Define the ETL function
def etl():
    film_df = extract_film_to_pandas()
    film_df = transform_rental_rate(film_df)
    load_dataframe_to_film(film_df)

# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
                          python_callable=etl,
                          dag=dag)

# Set the upstream to wait_for_table and sample run etl()
etl_task.set_upstream(wait_for_table)
etl()

NameError: name 'PythonOperator' is not defined

## 4 - Case Study - Datacamp Recommended Course

ETL Process:
datacamp_application > cleaning > Calculate recommendations > dataware house

#### Part 1

In [None]:
1. Goal: Get a feeling for the data.

In [None]:
## Querying the table

# Complete the connection URI
connection_uri = "postgresql://repl:password@localhost:5432/datacamp_application"
db_engine = sqlalchemy.create_engine(connection_uri)

# Get user with id 4387
user1 = pd.read_sql("SELECT * FROM rating WHERE user_id=4387", db_engine)

# Get user with id 18163
user2 = pd.read_sql("SELECT * FROM rating WHERE user_id=18163", db_engine)

# Get user with id 8770
user3 = pd.read_sql("SELECT * FROM rating WHERE user_id=8770", db_engine)

# Use the helper function to compare the 3 users
print_user_comparison(user1, user2, user3)

2. Goal: Get a DataFrame with two columns, a course and its average rating.

In [None]:
## Average rating per course

# Complete the transformation function
def transform_avg_rating(rating_data):
  # Group by course_id and extract average rating per course
  avg_rating = rating_data.groupby('course_id').rating.mean()
  # Return sorted average ratings per course
  sort_rating = avg_rating.sort_values(ascending=False).reset_index()
  return sort_rating

# Extract the rating data into a DataFrame    
rating_data = extract_rating_data(db_engines)

# Use transform_avg_rating on the extracted data and print results
avg_rating_data = transform_avg_rating(rating_data)
print(avg_rating_data) 

#### Part 2 
Recommendation techniques

-Matrix factorization

-Building Recommendation Engines with PySpark

3. Goal: take rating table and extrapolate three courses that would be nice to recommend

Common sense recommendation transformation

- Use technology that user has rated most

- Don't recommend courses that user already rated

- Recommend three highest rated courses from remaining combinations

In [None]:
## Filter out corrupt data

course_data = extract_course_data(db_engines)

# Print out the number of missing values per column
print(course_data.isnull().sum())

# The transformation should fill in the missing values
def transform_fill_programming_language(course_data):
    imputed = course_data.fillna({"programming_language": "r"})
    return imputed

transformed = transform_fill_programming_language(course_data)

# Print out the number of missing values per column of transformed
print(transformed.isnull().sum())

In [None]:
## Using the recommender transformation

# Complete the transformation function
def transform_recommendations(avg_course_ratings, courses_to_recommend):
    # Merge both DataFrames
    merged = courses_to_recommend.merge(avg_course_ratings) 
    # Sort values by rating and group by user_id
    grouped = merged.sort_values("rating", ascending = False).groupby('user_id')
    # Produce the top 3 values and sort by user_id
    recommendations = grouped.head(3).sort_values("user_id").reset_index()
    final_recommendations = recommendations[["user_id", "course_id","rating"]]
    # Return final recommendations
    return final_recommendations

# Use the function with the predefined DataFrame objects
recommendations = transform_recommendations(avg_course_ratings, courses_to_recommend)

#### Part 3 - Scheduling daily jobs



Loading  to Postgres

- Use the calculations in data products

- Update daily

- Example user case: sending out e-mails with recommendations


In [None]:
## The target table

connection_uri = "postgresql://repl:password@localhost:5432/dwh"
db_engine = sqlalchemy.create_engine(connection_uri)

def load_to_dwh(recommendations):
    recommendations.to_sql("recommendations", db_engine, if_exists="replace")

In [None]:
## Defining DAGs

# Define the DAG so it runs on a daily basis
dag = DAG(dag_id="recommendations",
          schedule_interval='0 0 * * *')

# Make sure `etl()` is called in the operator. Pass the correct kwargs.
task_recommendations = PythonOperator(
    task_id="recommendations_task",
    python_callable=etl,
    op_kwargs={"db_engines": db_engines},
)

#### Part 4 Query the recommendations

It can be used to product important features for DataCamp student such as customized marketing emails, intelligent recommendations for students, etc.


In [None]:
## Querying the recommendations

def recommendations_for_user(user_id, threshold=4.5):
    # Join with the courses table
    query = """
            SELECT title, rating FROM recommendations
            INNER JOIN courses ON courses.course_id = recommendations.course_id
            WHERE user_id=%(user_id)s AND rating>%(threshold)s
            ORDER BY rating DESC
        """
    # Add the threshold parameter
    predictions_df = pd.read_sql(query, db_engine, params = {"user_id": user_id, 
                                                           "threshold": threshold})
    return predictions_df.title.values

# Try the function you created
print(recommendations_for_user(12, 4.65))