In [38]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType


# Initialize Spark session
spark = SparkSession.builder.appName("DelimiterExample").getOrCreate()




In [39]:
# Load the data with '|' as delimiter
df = spark.read.option("delimiter", "|").csv("/home/jovyan/work/data/customer_data.csv",inferSchema=True, header=True)

# Show the data
df.show(5)

+---+-------------+-----------+---------+-------------------+--------------+-------+-----+-------+---------+--------+---------+
|  H|Customer_Name|Customer_Id|Open_Date|Last_Consulted_Date|Vaccination_Id|Dr_Name|State|Country|Post_Code|     DOB|Is_Active|
+---+-------------+-----------+---------+-------------------+--------------+-------+-----+-------+---------+--------+---------+
|  D|        Jacob|     556270| 20011219|           20180614|           HPT|  Emily|  BOS|     UK|    59972|19700211|        N|
|  D|       Mathew|     228782| 20040810|           20070418|           FLU|   John|  NYC|    GER|    39371|19930503|        A|
|  D|       Mathew|     123982| 20171001|           20190908|           HPT|   Paul|  WAS|    CAN|    89702|19730131|        N|
|  D|        Sarah|     398432| 20201212|           20170218|           HPT|   Mark|   AZ|     BR|    51761|19880413|        A|
|  D|         John|     114905| 20061129|           20021020|           COV|  Emily|   FL|    GER|    22

In [40]:
df.printSchema()

root
 |-- H: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Id: integer (nullable = true)
 |-- Open_Date: integer (nullable = true)
 |-- Last_Consulted_Date: integer (nullable = true)
 |-- Vaccination_Id: string (nullable = true)
 |-- Dr_Name: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Post_Code: integer (nullable = true)
 |-- DOB: integer (nullable = true)
 |-- Is_Active: string (nullable = true)



In [41]:
#convert the Open_Date, Last_Consulted_Date,DOB column in dateformat
from pyspark.sql.functions import to_date, col

# Assuming df has a column 'date_col' with values like 20011219
df1 = df.withColumn("Open_Date", to_date(col("Open_Date").cast("string"), "yyyyMMdd"))\
        .withColumn("Last_Consulted_Date",to_date(col("Last_Consulted_Date").cast("string"), "yyyyMMdd"))\
        .withColumn("DOB",to_date(col("DOB").cast("string"), "yyyyMMdd"))

# Show the result
df1.select("*").show(5)

+---+-------------+-----------+----------+-------------------+--------------+-------+-----+-------+---------+----------+---------+
|  H|Customer_Name|Customer_Id| Open_Date|Last_Consulted_Date|Vaccination_Id|Dr_Name|State|Country|Post_Code|       DOB|Is_Active|
+---+-------------+-----------+----------+-------------------+--------------+-------+-----+-------+---------+----------+---------+
|  D|        Jacob|     556270|2001-12-19|         2018-06-14|           HPT|  Emily|  BOS|     UK|    59972|1970-02-11|        N|
|  D|       Mathew|     228782|2004-08-10|         2007-04-18|           FLU|   John|  NYC|    GER|    39371|1993-05-03|        A|
|  D|       Mathew|     123982|2017-10-01|         2019-09-08|           HPT|   Paul|  WAS|    CAN|    89702|1973-01-31|        N|
|  D|        Sarah|     398432|2020-12-12|         2017-02-18|           HPT|   Mark|   AZ|     BR|    51761|1988-04-13|        A|
|  D|         John|     114905|2006-11-29|         2002-10-20|           COV|  Emil

In [45]:
from pyspark.sql.functions import to_date, col, datediff, current_date, floor


# Now calculate the age
df2 = df1.withColumn("age", floor(datediff(col("Last_Consulted_Date"), col("DOB")) / 365.25))
df2=df2.withColumn('Customer_Id',col('Customer_Id').cast('String'))

# Show the result
df3=df2.select("Customer_Name","Customer_Id","Open_Date","Last_Consulted_Date","Vaccination_Id","Dr_Name","State","Country","Post_Code","DOB","Is_Active","age")

In [46]:
df3.printSchema()

root
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Id: string (nullable = true)
 |-- Open_Date: date (nullable = true)
 |-- Last_Consulted_Date: date (nullable = true)
 |-- Vaccination_Id: string (nullable = true)
 |-- Dr_Name: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Post_Code: integer (nullable = true)
 |-- DOB: date (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- age: long (nullable = true)



In [None]:
# JDBC connection properties
jdbc_url = "jdbc:mysql://<hostname>:<port>/<database>"
jdbc_properties = {
    "user": "<db_user>",
    "password": "<db_password>",
    "driver": "com.mysql.cj.jdbc.Driver"
}

# SQL template to create tables
create_table_sql_template = """
CREATE TABLE IF NOT EXISTS customer_data_{country} (
    Customer_Name VARCHAR(255) NOT NULL,
    Customer_Id VARCHAR(18) NOT NULL PRIMARY KEY,
    Open_Date DATE NOT NULL,
    Last_Consulted_Date DATE,
    Vaccination_Type CHAR(5),
    Doctor_Consulted VARCHAR(255),
    State CHAR(5),
    Country CHAR(5),
    Post_Code INT,
    DOB DATE,
    Is_Active CHAR(1),
    
);
"""

# Get the unique countries
countries = df.select("Country").distinct().rdd.flatMap(lambda x: x).collect()

# Loop through each country and create table
for country in countries:
    create_table_sql = create_table_sql_template.format(country=country.lower())
    
    # Use PySpark's JDBC connection to execute the create table SQL query
    spark.sql(create_table_sql)
    print(f"Created table for country: {country}")
