In [7]:
from pyspark.sql import SparkSession
import mysql.connector
from pyspark.sql import SQLContext
import os

In [8]:
spark = SparkSession.builder.appName("Basics").getOrCreate()

In [9]:
mydb = mysql.connector.connect(host="localhost",user="root",passwd="",database="staging")

In [10]:
mycursor = mydb.cursor(buffered=True)

In [11]:
def dropCreateTable(schema, tablename):
    try:
        mycursor.execute("DROP TABLE IF EXISTS "+tablename)
        mycursor.execute("CREATE TABLE " + tablename + schema)
        mydb.commit
    except:
        mydb.rollback()

In [12]:
def populateData(df, country, tablename):
    try:
        y = spark.sql("""select * from {1} where country = '{0}'""".format(country, df))
        y.write.format("jdbc")\
        .option("url", "jdbc:mysql://localhost/staging")\
        .option("driver", "com.mysql.jdbc.Driver")\
        .option("dbtable", tablename)\
        .option("user", "root")\
        .option("truncate", "true")\
        .option("password", "")\
        .mode("overwrite")\
        .save()
    except Exception as e: 
        print("Error in the data added!!!")
        print(e)

In [68]:
filename = 'data.txt'

In [69]:
df = spark.read.options(delimiter = '|', header = True).csv(filename)

In [70]:
df.show()

+---+-------------+-----------+---------+-------------------+--------------+-------+-----+-------+--------+---------+
|  H|Customer_Name|Customer_Id|Open_Date|Last_Consulted_Date|Vaccination_Id|Dr_Name|State|Country|     DOB|Is_Active|
+---+-------------+-----------+---------+-------------------+--------------+-------+-----+-------+--------+---------+
|  D|         Alex|     123457| 20101012|           20121013|           MVD|   Paul|   SA|    USA|06031987|        A|
|  D|         John|     123458| 20101012|           20121013|           MVD|   Paul|   TN|    IND|06031987|        A|
|  D|       Mathew|     123459| 20101012|           20121013|           MVD|   Paul|  WAS|   PHIL|06031987|        A|
|  D|         Matt|      12345| 20101012|           20121013|           MVD|   Paul|  BOS|    NYC|06031987|        A|
|  D|        Jacob|       1256| 20101012|           20121013|           MVD|   Paul|  VIC|     AU|06031987|        A|
|  D|        Alex1|     123457| 20101012|           2012

In [71]:
df.createOrReplaceTempView('datainput')

In [72]:
country = spark.sql('SELECT DISTINCT country FROM datainput')

In [73]:
country.show()

+-------+
|country|
+-------+
|     AU|
|    USA|
|   PHIL|
|    IND|
|    NYC|
+-------+



In [78]:
schema = """(
CustomerName VARCHAR(255) PRIMARY KEY,
CustomerID VARCHAR(18) NOT NULL,
CustomerOpenDate DATE NOT NULL,
LastConsultedDate DATE,
VaccinationType CHAR(5),
DoctorConsulted  CHAR(255),
State CHAR(5),
Country CHAR(5),
PostCode INT(5),
DateofBirth  DATE,
ActiveCustomer CHAR(1)
);"""

In [79]:
processed_data = spark.sql("""SELECT Customer_Name AS CustomerName
, Customer_Id AS CustomerId
, CONCAT_WS('-',SUBSTRING(Open_Date, 1, 4),SUBSTRING(Open_Date, 5, 2),SUBSTRING(Open_Date, 7, 2)) AS CustomerOpenDate
, CONCAT_WS('-',SUBSTRING(Last_Consulted_Date, 1, 4),SUBSTRING(Last_Consulted_Date, 5, 2),SUBSTRING(Last_Consulted_Date, 7, 2)) AS LastConsultedDate
, Vaccination_Id AS VaccinationType 
, Dr_Name AS DoctorConsulted 
, State
, Country
, 1 AS PostCode
, CONCAT_WS('-',SUBSTRING(DOB, 5, 4),SUBSTRING(DOB, 3, 2),SUBSTRING(DOB, 1, 2)) AS DateofBirth
, Is_Active AS ActiveCustomer 
FROM datainput
WHERE Customer_Id IS NOT NULL AND Open_Date IS NOT NULL""")

In [80]:
processed_data.createOrReplaceTempView('proc_data_sql')

In [81]:
iterator = country.rdd.toLocalIterator()

In [82]:
for tab in iterator:
    tablename = "staging.customer_vaccination_" + tab["country"]
    dropCreateTable(schema, tablename)
    populateData("proc_data_sql", tab["country"], tablename)