In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

## Task of data engineer
Set up scheduled ingestion of data from the application databases to an analytical database

Data Engineer:
    - Cloud technology
    - Develop Scalable Data Architecture
    - Streamline Data Acquisition
    - Set up processes to bring together data
    - Clear Corrupted Data

Data Scientist: 
    - Mining data for pattern
    - Apply statistical models on large datasets
    - Build predictive models using ML
    - Develop tools to monitor business process
    - Clean outliers in data

Data Engineer Problem:

###Data scientists are querying the online store databases directly and slowing down the functioning of the application since it's using the same database. ###
Data Engineer should make sure there is a seperate database for analytic

###The online store is slow because the application's database server doesn't have enough memory.### infrastructure problem

## Tools of data engineer
#Processing example
##cluster of computers perform these operations using PySpark framework

df = spark.read.parquet("users.parquet") 

outliers = df.filter(df["age"] > 100)

print(outliers.count())

## Processing tasks
- Data engineers often have to join, clean, or organize data before loading it into a destination analytics database. This is done in the data processing, or data transformation step.
- Data Processing is distributed over clusters of virtual machines(e.g. using Spark)


# Scheduling tools

Make sure jobs run in a specific order and all dependencies are resolved correctly.

Make sure the jobs run at midnight UTC each day.

###Scale up the number of nodes when there's lots of data to be processed.### Jobs of processing tools

Why Cloud computing ?

The cloud can provide you with the resources you need, when you need them.

## Chapter 2: Data engineering toolbox

Databases: A usually large collection of data organized especially for rapid search and retrieval

- Holds data
- Organise data
- Retrieve/ Search Data
Database management system- 
DBMS -much more organised than file systems
- functitons: search, replication
Structured vs unstructured data

SQL database schema:
    
## Create Customer table
    CREATE TABLE "Customer"(
    "id" SERIAL not NULL,
    "first_name" varchar,
    "second_name" varchar,
    PRIMARY KEY ("id")
    )
    
## Create order table
    CREATE TABLE "Order"(
    "id" SERIAL not NULL,
    "customer_id" integer REFERENCES " Customer",
    "product_name" varchar,
    "product_price" integer,
        
    PRIMARY KEY("id")
    )

SELECT * FROM "Customer"
INNER JOIN "Order"
ON "customer_id" = "Customer" ."id";

SQL:
    - Customer data in a store's database
    - Always has a database schema
    - MySQL, PostgreSQL
    
NoSQL:
    - can be schemaless
    - Key- value stores: Redis e.g. catching layer in distributed web server
    - MongoDB

## The database schema

#db_engine: database engine, which has been defined for you and is called db_engine
#db_enginer = Engine(postgresql://repl@/postgres)    

#Complete the SELECT statement

data = pd.read_sql("""
SELECT first_name, last_name FROM "Customer"
ORDER BY last_name, first_name
""", db_engine)

#Show the first 3 rows of the DataFrame

print(data.head(n=3))

#Show the info of the DataFrame

print(data.info())

#Complete the SELECT statement

data = pd.read_sql("""

SELECT * FROM "Customer"

INNER JOIN "Order"

ON "Order"."customer_id"="Customer"."id"

""", db_engine)

#Show the id column of data
print(data.id)

In [1]:
## Parallel computing
Parallel computing can optimize the use of multiple processing units.
Parallel computing can optimize the use of memory between several machines.


In [None]:
multiprocessing.Pool
from multiprocessing import Tool
## function take_mean_Age
def take_mean_age(year_and_group):
    year, group = year_and_group
    return pd.DataFrame({"Age":group['Age'].mean()}, index = [year])

with Pool(4) as p: ##using 4 cores
    results = p.map(take_mean_age, athlete_events.groupby("Year"))
    
## concatenate the results to form the resulting DataFrame
result_df = pd.concat(results)

In [None]:
## Dask framework to avoid write low-level code
import dask.dataframe as dd

## partition dataframe into 4
athlete_events_dask = dd.from_pandas(athlete_events, npartitions = 4)

## Run parallel computations from each partition
result_df = athlete_events_dask.groupby("Year").Age.mean().compute() ## dask use lazy evaluation hence add.compute()

In [None]:
## From task to subtasks
# Function to apply a function over multiple cores
@print_timing
#It takes in as input the function being applied, the grouping used, and the number of cores needed for the analysis.
def parallel_apply(apply_func, groups, nb_cores):
    with Pool(nb_cores) as p:
        results = p.map(apply_func, groups)
    return pd.concat(results)

# Parallel apply using 1 core
parallel_apply(take_mean_age, athlete_events.groupby('Year'), nb_cores = 1)

# Parallel apply using 2 cores
parallel_apply(take_mean_age, athlete_events.groupby('Year'), nb_cores = 2)

# Parallel apply using 4 cores
parallel_apply(take_mean_age, athlete_events.groupby('Year'), nb_cores = 4)

In [6]:
import pandas as pd
athlete_events = pd.read_csv("athlete_events.csv")
athlete_events.head()

Unnamed: 0,ID,Name,Sex,Age,Height,Weight,Team,NOC,Games,Year,Season,City,Sport,Event,Medal
0,1,A Dijiang,M,24.0,180.0,80.0,China,CHN,1992 Summer,1992,Summer,Barcelona,Basketball,Basketball Men's Basketball,
1,2,A Lamusi,M,23.0,170.0,60.0,China,CHN,2012 Summer,2012,Summer,London,Judo,Judo Men's Extra-Lightweight,
2,3,Gunnar Nielsen Aaby,M,24.0,,,Denmark,DEN,1920 Summer,1920,Summer,Antwerpen,Football,Football Men's Football,
3,4,Edgar Lindenau Aabye,M,34.0,,,Denmark/Sweden,DEN,1900 Summer,1900,Summer,Paris,Tug-Of-War,Tug-Of-War Men's Tug-Of-War,Gold
4,5,Christine Jacoba Aaftink,F,21.0,185.0,82.0,Netherlands,NED,1988 Winter,1988,Winter,Calgary,Speed Skating,Speed Skating Women's 500 metres,


In [None]:
## Using a dataframe 
import dask.dataframe as dd

# Set the number of pratitions
athlete_events_dask = dd.from_pandas(athlete_events, npartitions = 4)

# Calculate the mean Age per Year
print(athlete_events_dask.groupby('Year').Age.mean().compute())

In [None]:
## Parallel computation framework
# Hive Example
# Select average age of athlete per years they participate
SELECT year,AVG(age)
FROM views.athlete_events
GROUP BY year

In [None]:
## PySpark an example
## Load the dataset into athlete_Events_spark first
(athlete_events_spark
    .groupBy("Year")
    .mean("Age")
    .show())

Hadoop
- MapReduce is a part of it
- Collection of open-source packages for Big Data
- HDFS is a part of it

PySpark
- Uses DataFrame abstraction
- Python interface for the Spark framework

Hive
- Initially used Hadoop MapReduce
- is built from the need to use structure queries for parallel processing

.printSchema(): helps print the schema of a Spark DataFrame.
.groupBy(): grouping statement for an aggregation.
.mean(): take the mean over each group.
.show(): show the results.

In [None]:
#A PySpark groupby

# Print the type of athlete_events_spark
print(type(athlete_events_spark))

# Print the schema of athlete_events_spark
print(athlete_events_spark.printSchema())

# Group by the Year, and find the mean Age
print(athlete_events_spark.groupBy('Year').mean("Age"))

# Group by the Year, and find the mean Age
print(athlete_events_spark.groupBy('Year').mean("Age").show())

In [None]:
cat /home/repl/spark-script.py

spark-submit \
  --master local[4] \
  /home/repl/spark-script.py

https://www.analyticsvidhya.com/blog/2018/11/data-engineer-comprehensive-list-resources-get-started/

In [None]:
## Create DAG object
dag = DAG(dag_id = "example_dag", ..., schedule_interval = "0 * * * *")
# Define operations
start_cluster = StartClusterOperator(task_id = "start_cluster", dag = dag)
ingest_customer_data = SparkJobOperator(task_id = "ingest_customer_data", dag =dag)
ingest_product_data = SparkJobOperator(task_id = "ingest_product_data", dag =dag)
enrich_customer_data = PythonOperator (task_id = "enrich_customer_data", dag =dag)

# Set up dependency flow
start_cluster.set_downstream(ingest_customer_data)
ingest_customer_data.set_downstream(enrich_customer_data)
ingest_product_data.set_downstream(enrich_customer_data)

In [None]:
# Create the DAG object
dag = DAG(dag_id="car_factory_simulation",
          default_args={"owner": "airflow","start_date": airflow.utils.dates.days_ago(2)},
          schedule_interval="0 * * * *") ## run everyhour at minutes 0

# Task definitions
assemble_frame = BashOperator(task_id="assemble_frame", bash_command='echo "Assembling frame"', dag=dag)
place_tires = BashOperator(task_id="place_tires", bash_command='echo "Placing tires"', dag=dag)
assemble_body = BashOperator(task_id="assemble_body", bash_command='echo "Assembling body"', dag=dag)
apply_paint = BashOperator(task_id="apply_paint", bash_command='echo "Applying paint"', dag=dag)

# Complete the downstream flow
assemble_frame.set_downstream(place_tires)
assemble_frame.set_downstream(assemble_body)
assemble_body.set_downstream(apply_paint)

# Chapter 3. Extract transform load

In [3]:
#JSON
import json
result = json.loads('{"key_1" : "value_1","key_2" :"value_2"}')

print(result["key_1"])

value_1


In [5]:
import requests
response = requests.get("https://hacker-news.firebaseio.com/v0/item/16222426.json")
print(response.json())

{'by': 'neis', 'descendants': 0, 'id': 16222426, 'score': 17, 'time': 1516800333, 'title': 'Duolingo-Style Learning for Data Science: DataCamp for Mobile', 'type': 'story', 'url': 'https://medium.com/datacamp/duolingo-style-learning-for-data-science-datacamp-for-mobile-3861d1bc02df'}


In [None]:
## Extraction from databases
import sqlalchemy
connection_url = "postgresql://repl:password#localhost:5432/pagila"
db_enginer = sqlalchemy.create_engine(connection_url)
## interact with the data
import pandas as pd
pd.read_sql("SELECT * FROM customer",db_enginer)

In [8]:
##Fetch from an API

import requests

# Fetch the Hackernews post
resp = requests.get("https://hacker-news.firebaseio.com/v0/item/16222426.json")

# Print the response parsed as JSON
print(resp.json())

# Assign the score of the test to post_score
post_score = resp.json()["score"]
print(post_score)

{'by': 'neis', 'descendants': 0, 'id': 16222426, 'score': 17, 'time': 1516800333, 'title': 'Duolingo-Style Learning for Data Science: DataCamp for Mobile', 'type': 'story', 'url': 'https://medium.com/datacamp/duolingo-style-learning-for-data-science-datacamp-for-mobile-3861d1bc02df'}
17


In [None]:
## Read from a database
import pandas as pd
# Function to extract table to a pandas DataFrame
def extract_table_to_pandas(tablename, db_engine):
    query = "SELECT * FROM {}".format(tablename)
    return pd.read_sql(query, db_engine)

# Connect to the database using the connection URI
connection_uri = "postgresql://repl:password@localhost:5432/pagila" 
db_engine = sqlalchemy.create_engine(connection_uri)

# Extract the film table into a pandas DataFrame
extract_table_to_pandas("film", db_engine)

# Extract the customer table into a pandas DataFrame
extract_table_to_pandas("customer", db_engine)

In [None]:
# Transform

customer_df ## pandas frame with customer data
# Split email column into 2 columns on the @ symbol, first column everything before @, second one everything after @
split_email = customer_df.email.str.split("@", expand = True)

## Create 2 new columns using resulting DataFrame
customer_df = customer_df.assign(
    username = split_email[0],
    domain = split_email[1])

In [1]:
import os
os.environ["JAVA_HOME"] = "C:/Program\ Files/Java/jdk1.8.0_60"


In [None]:
import pyspark.sql
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [None]:
spark.read.jdbc("jdbc:postgresql://localhost:5432/pagila",
                "customer", #the name of the table
               properties = {"user" : "repl", "password" : "password"})

In [None]:
## Join example using PySpark
customer_df ## PySpark DataFrame with customer data
ratings_df ## PySpark DataFrame with ratings data

## get the mean rating of each customer and add it to the dataframe

# groupby customer id then calculate mean
ratings_per_customer = ratings_df.groupBy("customer_id").mean("rating")

# Join on customer ID
customer_df.join(ratings_per_customer, 
                 customer_df.customer_id == ratings_per_customer.customer_id)