In [0]:
%run "/Workspace/Shared/Connection and Helper Function"

In [0]:
from pyspark.sql.functions import when, col, lit, current_date, year, coalesce

### Customer Data Curation

In [0]:
customers_raw = spark.read.format('parquet').load('/mnt/adlslanding/customers/')

In [0]:
customers_cleaned = customers_raw.filter(col('CustomerID').isNotNull() & col('Name').isNotNull() & col('Email').isNotNull() & col('CityID').isNotNull() & col('BirthYear').isNotNull() & col('Gender').isNotNull() & col('SignupDate').isNotNull()).withColumn('SignupDate',col('SignupDate').cast('date'))

In [0]:
customers_current_day = customers_cleaned.withColumn('Age', year(current_date())-col('BirthYear').cast('int')).withColumn('AgeGroup', when(col('Age')<25, 'Under 25').when(col('Age')<45, '25-44').when(col('Age')<65, '45-64').otherwise('Over 65').alias('AgeGroup')).drop('Age','BirthYear')

In [0]:
customers_table = "(SELECT * FROM CURATED.CUSTOMER) AS tmp"

customers_existing = spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", customers_table) \
    .option("user", username) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()

In [0]:
customers_existing_inactive = customers_existing.filter(col('ACTIVE_IND')=='N')
customers_existing_active = customers_existing.filter(col('ACTIVE_IND')=='Y')

In [0]:
customer_current_existing_join = customers_current_day.join(customers_existing_active, (customers_current_day['CustomerID']==customers_existing_active['CUSTOMER_ID']) & (customers_existing_active['NAME']==customers_current_day['Name']) & (customers_existing_active['EMAIL']==customers_current_day['Email']) & (customers_existing_active['CITY_ID']==customers_current_day['CityID']) & (customers_existing_active['AGE_GROUP']==customers_current_day['AgeGroup']), 'left_outer')

In [0]:
customers_new_changed_activate = customer_current_existing_join.filter(customers_existing_active['CUSTOMER_ID'].isNull())\
        .select(customers_current_day['CustomerID'],customers_current_day['Name'],customers_current_day['Email'],customers_current_day['CityID'],customers_current_day['Gender'],customers_current_day['SignupDate'],customers_current_day['AgeGroup'])\
        .withColumnRenamed('CustomerID','CUSTOMER_ID').withColumnRenamed('Name','NAME').withColumnRenamed('Email','EMAIL').withColumnRenamed('CityID','CITY_ID').withColumnRenamed('Gender','GENDER').withColumnRenamed('SignupDate','SIGNUP_DATE').withColumnRenamed('AgeGroup','AGE_GROUP')\
        .withColumn('EFFECTIVE_START_DATE',current_date()).withColumn('EFFECTIVE_END_DATE',lit(None).cast('date')).withColumn('ACTIVE_IND',lit('Y'))

In [0]:
customers_unchanged = customer_current_existing_join.filter(customers_existing_active['CUSTOMER_ID'].isNotNull())\
        .select(customers_existing_active['CUSTOMER_ID'],customers_existing_active['NAME'],customers_existing_active['EMAIL'],customers_existing_active['CITY_ID'],customers_existing_active['GENDER'],customers_existing_active['SIGNUP_DATE'],customers_existing_active['AGE_GROUP'],customers_existing_active['EFFECTIVE_START_DATE'],customers_existing_active['EFFECTIVE_END_DATE'],customers_existing_active['ACTIVE_IND'])

In [0]:
customers_deleted_changed_inactivate = customers_existing_active.filter(col('ACTIVE_IND')=='Y').join(customers_current_day, customers_existing_active.CUSTOMER_ID==customers_current_day.CustomerID, 'left_outer')\
    .filter((customers_existing_active['NAME']!=coalesce(customers_current_day['Name'],lit(''))) 
                                   | (customers_existing_active['EMAIL']!=coalesce(customers_current_day['Email'],lit('')))
                                   | (customers_existing_active['CITY_ID']!=coalesce(customers_current_day['CityID'],lit(''))) 
                                   | (customers_existing_active['AGE_GROUP']!=coalesce(customers_current_day['AgeGroup'],lit(''))))\
    .select(customers_existing_active['CUSTOMER_ID'],customers_existing_active['NAME'],customers_existing_active['EMAIL'],customers_existing_active['CITY_ID'],customers_existing_active['GENDER'],customers_existing_active['SIGNUP_DATE'],customers_existing_active['AGE_GROUP'],customers_existing_active['EFFECTIVE_START_DATE'])\
    .withColumn('EFFECTIVE_END_DATE',lit(current_date())).withColumn('ACTIVE_IND',lit('N'))

In [0]:
customer_final = customers_new_changed_activate.union(customers_deleted_changed_inactivate).union(customers_existing_inactive).union(customers_unchanged)

In [0]:
customer_final.write.jdbc(url=jdbc_url, table="CURATED.CUSTOMER", mode="overwrite", properties=connection_properties)

### Cities Curation

In [0]:
cities_raw = spark.read.format('parquet').load('/mnt/adlslanding/cities/')

In [0]:
cities_cleaned = cities_raw.filter(col('CityID_1').isNotNull() & col('CityName_1').isNotNull() & col('CityID_2').isNotNull() & col('CityName_2').isNotNull() & col('Distance_km').isNotNull())

In [0]:
cities_final = cities_cleaned.withColumnRenamed('CityID_1','CITY_ID').withColumnRenamed('CityName_1','CITY_NAME').withColumnRenamed('CityName_2','CITY_NAME_TO').withColumnRenamed('CityID_2','CITY_ID_TO').withColumnRenamed('Distance_km','DISTANCE').withColumn('CITY_ID',col('CITY_ID').cast('int')).withColumn('CITY_ID_TO',col('CITY_ID_TO').cast('int')).withColumn('DISTANCE',col('DISTANCE').cast('float'))

In [0]:
cities_final.write.jdbc(url=jdbc_url, table="CURATED.CITY_DISTANCE", mode="overwrite", properties=connection_properties)

In [0]:
dbutils.notebook.exit(customer_final.count()+cities_final.count())