### We need to simulate the arrival of ADD, FULL and DELTA flows to our Data Lake. Let's use for this purpose the dump of a retail database. We will load it into mysql and generate csv files to illustrate the arrival of regular flows into our Transfer Hub.

In [1]:
import pymysql

db = pymysql.connect(host="localhost", user='hiveuser', password='hivepassword')
cur = db.cursor()
cur.execute("CREATE DATABASE retail_db")
cur.execute("USE retail_db")
cur.execute(open("dump.sql").read())
cur.close()
db.close()

In [2]:
import pandas as pd

db = pymysql.connect(host="localhost", user='hiveuser', password='hivepassword', db="retail_db")
pd.read_sql("SHOW TABLES", db)

Unnamed: 0,Tables_in_retail_db
0,categories
1,customers
2,departments
3,order_items
4,orders
5,products


In [3]:
pd.read_sql("DESCRIBE customers", db)

Unnamed: 0,Field,Type,Null,Key,Default,Extra
0,customer_id,int(11),NO,PRI,,auto_increment
1,customer_fname,varchar(45),NO,,,
2,customer_lname,varchar(45),NO,,,
3,customer_email,varchar(45),NO,,,
4,customer_password,varchar(45),NO,,,
5,customer_street,varchar(255),NO,,,
6,customer_city,varchar(45),NO,,,
7,customer_state,varchar(45),NO,,,
8,customer_zipcode,varchar(45),NO,,,


In [4]:
pd.read_sql("SELECT * FROM orders LIMIT 5", db)

Unnamed: 0,order_id,order_date,order_customer_id,order_status
0,1,2013-07-25,11599,CLOSED
1,2,2013-07-25,256,PENDING_PAYMENT
2,3,2013-07-25,12111,COMPLETE
3,4,2013-07-25,8827,CLOSED
4,5,2013-07-25,11318,COMPLETE


### generating ADD Flow: daily orders

#### Schemas will be stored in memory for this tutorial - we will explore in a later post how to select the right repository

In [5]:
pd.read_sql("SELECT * FROM orders WHERE order_date='2014-01-01'", db).to_csv('orders20140101.csv', header=False, index=False, quoting=1)
pd.read_sql("SELECT * FROM orders WHERE order_date='2014-01-02'", db).to_csv('orders20140102.csv', header=False, index=False, quoting=1)
pd.read_sql("SELECT * FROM orders WHERE order_date='2014-01-03'", db).to_csv('orders20140103.csv', header=False, index=False, quoting=1)

schemaStringADD = ','.join(pd.read_sql("DESCRIBE orders", db).Field)
print schemaStringADD
typeStringADD = 'INT,STRING,INT,STRING'

order_id,order_date,order_customer_id,order_status


### generating FULL Flow: products

In [6]:
pd.read_sql("SELECT * FROM products", db).to_csv('products20150101.csv', header=False, index=False, quoting=1)
pd.read_sql("SELECT * FROM products", db).to_csv('products20150102.csv', header=False, index=False, quoting=1)

schemaStringFULL = ','.join(pd.read_sql("DESCRIBE products", db).Field)
print schemaStringFULL
typeStringFULL = 'INT,INT,STRING,STRING,FLOAT,STRING'

product_id,product_category_id,product_name,product_description,product_price,product_image


### generating DELTA Flow: customers

In [7]:
pd.read_sql("SELECT * FROM customers", db).to_csv('customers_ref.csv', header=False, index=False, quoting=1)
pd.read_sql("SELECT * FROM customers WHERE customer_state='NY'", db).to_csv('customers20150101.csv', header=False, index=False, quoting=1)

schemaStringDELTA = ','.join(pd.read_sql("DESCRIBE customers", db).Field)
print schemaStringDELTA
typeStringDELTA = 'INT,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING'

customer_id,customer_fname,customer_lname,customer_email,customer_password,customer_street,customer_city,customer_state,customer_zipcode


### Let's move the generated files into HDFS

#### we will explore later how to move files appropriately between Local and HDFS

In [8]:
#! hdfs dfs -rm -r /user/datacruncher/in
#! hdfs dfs -rm -r /user/datacruncher/raw
#! hdfs dfs -rm -r /user/datacruncher/out
! hdfs dfs -mkdir -p /user/datacruncher/in
! hdfs dfs -mkdir -p /user/datacruncher/raw
! hdfs dfs -put orders*.csv /user/datacruncher/in/
! hdfs dfs -put products*.csv /user/datacruncher/in/
! hdfs dfs -put customers*.csv /user/datacruncher/in/
! rm orders*.csv products*.csv customers*.csv

15/12/18 09:51:04 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/datacruncher/in
15/12/18 09:51:07 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/datacruncher/raw
15/12/18 09:51:10 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/datacruncher/out


### Create Hive Tables

In [9]:
sqlStringADD = ', '.join([' '.join(x) for x in zip(schemaStringADD.split(','), typeStringADD.split(','))])
print sqlStringADD
sqlStringFULL = ', '.join([' '.join(x) for x in zip(schemaStringFULL.split(','), typeStringFULL.split(','))])
print sqlStringFULL
sqlStringDELTA = ', '.join([' '.join(x) for x in zip(schemaStringDELTA.split(','), typeStringDELTA.split(','))])
print sqlStringDELTA

order_id INT, order_date STRING, order_customer_id INT, order_status STRING
product_id INT, product_category_id INT, product_name STRING, product_description STRING, product_price FLOAT, product_image STRING
customer_id INT, customer_fname STRING, customer_lname STRING, customer_email STRING, customer_password STRING, customer_street STRING, customer_city STRING, customer_state STRING, customer_zipcode STRING


In [10]:
import pyhs2
conn = pyhs2.connect(host='localhost', port=10000, authMechanism="PLAIN", user='nasdag', password='', database='default')
cur = conn.cursor()
cur.execute("drop table orders")
cur.execute("drop table products")
#cur.execute("drop table customers")
cur.execute("CREATE EXTERNAL TABLE orders (" + sqlStringADD + """)
        COMMENT 'orders table'
        PARTITIONED BY (crunch_date STRING)
        STORED AS PARQUET
        LOCATION '/user/datacruncher/out/orders'""")
cur.execute("CREATE EXTERNAL TABLE products (" + sqlStringFULL + """)
        COMMENT 'products table'
        PARTITIONED BY (crunch_date STRING)
        STORED AS PARQUET
        LOCATION '/user/datacruncher/out/products'""")
#cur.execute("CREATE EXTERNAL TABLE customers (" + sqlStringDELTA + """)
#        COMMENT 'customers table'
#        PARTITIONED BY (crunch_date STRING)
#        STORED AS PARQUET
#        LOCATION '/user/datacruncher/out/customers'""")

### Start pyspark to process the data flows 

In [11]:
#sc.stop()
import pyspark
sc = pyspark.SparkContext()

from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
sqlContext.setConf("spark.sql.sources.partitionColumnTypeInference.enabled", "false")

#### create the appropriate schemas from the schema strings

In [21]:
from pyspark.sql.types import *
st = { 'INT': IntegerType(), 'STRING': StringType() , 'FLOAT': FloatType() }

fieldsADD = [StructField(field_name, st[field_type], False) for field_name, field_type in \
          zip(schemaStringADD.split(','), typeStringADD.split(','))]
schemaADD = StructType(fieldsADD)
print schemaADD

fieldsFULL = [StructField(field_name, st[field_type], False) for field_name, field_type in \
          zip(schemaStringFULL.split(','), typeStringFULL.split(','))]
schemaFULL = StructType(fieldsFULL)
print schemaFULL

StructType(List(StructField(order_id,IntegerType,false),StructField(order_date,StringType,false),StructField(order_customer_id,IntegerType,false),StructField(order_status,StringType,false)))
StructType(List(StructField(product_id,IntegerType,false),StructField(product_category_id,IntegerType,false),StructField(product_name,StringType,false),StructField(product_description,StringType,false),StructField(product_price,FloatType,false),StructField(product_image,StringType,false)))


#### load the csv flow according to its schema

In [13]:
df = sqlContext.read.format("com.databricks.spark.csv").schema(schemaADD).load('/user/datacruncher/in/orders20140101.csv')

In [14]:
df.first()

Row(order_id=25876, order_date=u'2014-01-01 00:00:00', order_customer_id=3414, order_status=u'PENDING_PAYMENT')

#### add the crunch date column

In [15]:
sqlContext.registerDataFrameAsTable(df, "df")
dfWithCrunchDate = sqlContext.sql("SELECT *, '2015-12-18' as crunch_date FROM df")
dfWithCrunchDate.first()

Row(order_id=25876, order_date=u'2014-01-01 00:00:00', order_customer_id=3414, order_status=u'PENDING_PAYMENT', crunch_date=u'2015-12-18')

#### append to the parquet table and update Hive

In [16]:
dfWithCrunchDate.write.format("parquet").mode('Append').partitionBy('crunch_date').parquet('/user/datacruncher/out/orders')
sqlContext.sql("MSCK REPAIR TABLE orders")

DataFrame[result: string]

#### move the original file to a raw directory

In [17]:
! hdfs dfs -mv /user/datacruncher/in/orders20140101.csv /user/datacruncher/raw/

#### query the content of Hive through JDBC

In [18]:
conn = pyhs2.connect(host='localhost', port=10000, authMechanism="PLAIN", user='nasdag', password='', database='default')
cur = conn.cursor()
cur.execute("select * from orders order by order_id limit 10")
pd.DataFrame(cur.fetchall())

Unnamed: 0,0,1,2,3,4
0,25876,2014-01-01 00:00:00,3414,PENDING_PAYMENT,2015-12-18
1,25877,2014-01-01 00:00:00,5549,PENDING_PAYMENT,2015-12-18
2,25878,2014-01-01 00:00:00,9084,PENDING,2015-12-18
3,25879,2014-01-01 00:00:00,5118,PENDING,2015-12-18
4,25880,2014-01-01 00:00:00,10146,CANCELED,2015-12-18
5,25881,2014-01-01 00:00:00,3205,PENDING_PAYMENT,2015-12-18
6,25882,2014-01-01 00:00:00,4598,COMPLETE,2015-12-18
7,25883,2014-01-01 00:00:00,11764,PENDING,2015-12-18
8,25884,2014-01-01 00:00:00,7904,PENDING_PAYMENT,2015-12-18
9,25885,2014-01-01 00:00:00,7253,PENDING,2015-12-18


#### repeat the same for FULL

In [24]:
df = sqlContext.read.format("com.databricks.spark.csv").schema(schemaFULL).load('/user/datacruncher/in/products20150101.csv')
sqlContext.registerDataFrameAsTable(df, "df")
dfWithCrunchDate = sqlContext.sql("SELECT *, '2015-12-18' as crunch_date FROM df")
dfWithCrunchDate.write.format("parquet").mode('Append').partitionBy('crunch_date').parquet('/user/datacruncher/out/products')
sqlContext.sql("MSCK REPAIR TABLE products")
! hdfs dfs -mv /user/datacruncher/in/products20150101.csv /user/datacruncher/raw/

df = sqlContext.read.format("com.databricks.spark.csv").schema(schemaFULL).load('/user/datacruncher/in/products20150102.csv')
sqlContext.registerDataFrameAsTable(df, "df")
dfWithCrunchDate = sqlContext.sql("SELECT *, '2015-12-19' as crunch_date FROM df")
dfWithCrunchDate.write.format("parquet").mode('Append').partitionBy('crunch_date').parquet('/user/datacruncher/out/products')
sqlContext.sql("MSCK REPAIR TABLE products")
! hdfs dfs -mv /user/datacruncher/in/products20150102.csv /user/datacruncher/raw/

cur = conn.cursor()
cur.execute("select * from products limit 10")
pd.DataFrame(cur.fetchall())

Unnamed: 0,0,1,2,3,4,5,6
0,1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+F...,2015-12-18
1,2,2,Under Armour Men's Highlight MC Football Clea,,129.990005,http://images.acmesports.sports/Under+Armour+M...,2015-12-18
2,3,2,Under Armour Men's Renegade D Mid Football Cl,,89.989998,http://images.acmesports.sports/Under+Armour+M...,2015-12-18
3,4,2,Under Armour Men's Renegade D Mid Football Cl,,89.989998,http://images.acmesports.sports/Under+Armour+M...,2015-12-18
4,5,2,Riddell Youth Revolution Speed Custom Footbal,,199.990005,http://images.acmesports.sports/Riddell+Youth+...,2015-12-18
5,6,2,Jordan Men's VI Retro TD Football Cleat,,134.990005,http://images.acmesports.sports/Jordan+Men%27s...,2015-12-18
6,7,2,Schutt Youth Recruit Hybrid Custom Football H,,99.989998,http://images.acmesports.sports/Schutt+Youth+R...,2015-12-18
7,8,2,Nike Men's Vapor Carbon Elite TD Football Cle,,129.990005,http://images.acmesports.sports/Nike+Men%27s+V...,2015-12-18
8,9,2,Nike Adult Vapor Jet 3.0 Receiver Gloves,,50.0,http://images.acmesports.sports/Nike+Adult+Vap...,2015-12-18
9,10,2,Under Armour Men's Highlight MC Football Clea,,129.990005,http://images.acmesports.sports/Under+Armour+M...,2015-12-18


#### create a view for Hive queries

In [38]:
print sqlStringFULL
print schemaStringFULL
cur = conn.cursor()
cur.execute("CREATE VIEW IF NOT EXISTS prod (" + schemaStringFULL + """)
            AS SELECT """ + schemaStringFULL + """ FROM products
            WHERE crunch_date = '2015-12-19' """)
cur.execute("select * from prod order by product_id limit 10")
pd.DataFrame(cur.fetchall())

product_id INT, product_category_id INT, product_name STRING, product_description STRING, product_price FLOAT, product_image STRING
product_id,product_category_id,product_name,product_description,product_price,product_image


Unnamed: 0,0,1,2,3,4,5
0,1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+F...
1,2,2,Under Armour Men's Highlight MC Football Clea,,129.990005,http://images.acmesports.sports/Under+Armour+M...
2,3,2,Under Armour Men's Renegade D Mid Football Cl,,89.989998,http://images.acmesports.sports/Under+Armour+M...
3,4,2,Under Armour Men's Renegade D Mid Football Cl,,89.989998,http://images.acmesports.sports/Under+Armour+M...
4,5,2,Riddell Youth Revolution Speed Custom Footbal,,199.990005,http://images.acmesports.sports/Riddell+Youth+...
5,6,2,Jordan Men's VI Retro TD Football Cleat,,134.990005,http://images.acmesports.sports/Jordan+Men%27s...
6,7,2,Schutt Youth Recruit Hybrid Custom Football H,,99.989998,http://images.acmesports.sports/Schutt+Youth+R...
7,8,2,Nike Men's Vapor Carbon Elite TD Football Cle,,129.990005,http://images.acmesports.sports/Nike+Men%27s+V...
8,9,2,Nike Adult Vapor Jet 3.0 Receiver Gloves,,50.0,http://images.acmesports.sports/Nike+Adult+Vap...
9,10,2,Under Armour Men's Highlight MC Football Clea,,129.990005,http://images.acmesports.sports/Under+Armour+M...
