In [None]:
# Import Packages

from pyspark.sql import SparkSession
from pyspark.sql.functions import add_months
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType, BooleanType

spark = SparkSession.builder.appName('Store Address History').master('local[*]').getOrCreate()

In [None]:
# Import data from 'liquor-sales-data/raw_data/' in ADLS

file_path = "/mnt/liquor-sales-data/raw_data/store_address_history.csv"

schema = StructType([
    StructField("Store Number", StringType(), True),
    StructField("Address", StringType(), True),
    StructField("City", StringType(), True),
    StructField("Zip Code", IntegerType(), True),
    StructField("County Number", StringType(), True),
    StructField("Latitude", DoubleType(), True),
    StructField("Longitude", DoubleType(), True),
    StructField("From Date", DateType(), True),
    StructField("To Date", DateType(), True),
    StructField("Is Current", BooleanType(), False),
    StructField("Address Sequence", IntegerType(), False)
])
df_address_history = spark.read.load(file_path, format='csv', header=True, schema=schema)

# +------------+--------------------+-------------+--------+-------------+---------+----------+----------+----------+----------+----------------+
# |Store Number|             Address|         City|Zip Code|County Number| Latitude| Longitude| From Date|   To Date|Is Current|Address Sequence|
# +------------+--------------------+-------------+--------+-------------+---------+----------+----------+----------+----------+----------------+
# |    STO_2106|        2217 COLLEGE|  CEDAR FALLS|   50613|        CNT_7|42.517182|-92.455796|2012-01-05|2020-09-24|      true|               1|
# |    STO_2113|         1119 MARKET|       GOWRIE|   50543|       CNT_94| 42.28057|-94.289457|2012-01-04|2020-02-03|      true|               1|
# |    STO_2130|        617 SYCAMORE|     WATERLOO|   50703|        CNT_7|42.497854|-92.335358|2012-01-05|2020-09-24|      true|               1|
# |    STO_2152|       202 4TH NORTH|     ROCKWELL|   50469|       CNT_17|42.986351|-93.188172|2012-01-09|2016-02-25|      true|               1|
# |    STO_2161|      FIRST AND MAIN|FORT ATKINSON|   52144|       CNT_96|     NULL|      NULL|2012-01-09|2012-07-09|      true|               1|
# |    STO_2178|  618 ROSSVILLE ROAD|       WAUKON|   52172|        CNT_3|43.262114|-91.473634|2012-01-09|2020-09-25|      true|               1|
# |    STO_2190|     1460 2ND AVENUE|   DES MOINES|   50314|       CNT_77| 41.60566|-93.619787|2012-01-03|2020-09-30|      true|               1|
# |    STO_2191|           1013 MAIN|       KEOKUK|   52632|       CNT_56|40.400038|-91.387797|2012-01-03|2020-09-30|      true|               1|
# |    STO_2200|       619 EAST MAIN|     SAC CITY|   50583|       CNT_81|42.421341|-94.974011|2012-01-04|2020-09-28|      true|               1|
# |    STO_2205| 900 EAST WASHINGTON|     CLARINDA|   51632|       CNT_73|40.739231|-95.027238|2012-01-03|2015-12-28|      true|               1|
# |    STO_2228|            53 GREEN|    WINTERSET|   50273|       CNT_61|41.336405|-94.013374|2012-01-09|2020-09-25|      true|               1|
# |    STO_2233|2508 ENTERPRISE A...|  SPIRIT LAKE|   51360|       CNT_30|43.416433|-95.126786|2012-01-03|2020-09-30|      true|               1|
# |    STO_2238|3200 ADVENTURELAN...|      ALTOONA|   50009|       CNT_77|41.658513| -93.49924|2012-01-03|2019-04-25|     false|               1|
# |    STO_2238|305 34TH AVENUE N...|      ALTOONA|   50009|       CNT_77|     NULL|      NULL|2019-05-09|2020-09-17|      true|               2|
# |    STO_2248|3500 INGERSOLL AV...|   DES MOINES|   50312|       CNT_77|41.586319|-93.664182|2012-01-09|2020-09-24|      true|               1|
# |    STO_2260|    509 FIRST AVENUE|        PERRY|   50220|       CNT_25|41.832248|-94.106557|2012-01-09|2012-01-16|      true|               1|
# |    STO_2285|     401 EAST MARKET|    IOWA CITY|   52240|       CNT_52|41.663541|-91.529854|2012-01-09|2020-09-30|      true|               1|
# |    STO_2290|215 WEST MILWAUKE...|   STORM LAKE|   50588|       CNT_11|42.647492|-95.202405|2012-01-04|2016-10-04|      true|               1|

+------------+--------------------+-------------+--------+-------------+---------+----------+----------+----------+----------+----------------+
|Store Number|             Address|         City|Zip Code|County Number| Latitude| Longitude| From Date|   To Date|Is Current|Address Sequence|
+------------+--------------------+-------------+--------+-------------+---------+----------+----------+----------+----------+----------------+
|    STO_2106|        2217 COLLEGE|  CEDAR FALLS|   50613|        CNT_7|42.517182|-92.455796|2012-01-05|2020-09-24|      true|               1|
|    STO_2113|         1119 MARKET|       GOWRIE|   50543|       CNT_94| 42.28057|-94.289457|2012-01-04|2020-02-03|      true|               1|
|    STO_2130|        617 SYCAMORE|     WATERLOO|   50703|        CNT_7|42.497854|-92.335358|2012-01-05|2020-09-24|      true|               1|
|    STO_2152|       202 4TH NORTH|     ROCKWELL|   50469|       CNT_17|42.986351|-93.188172|2012-01-09|2016-02-25|      true|          

In [None]:
# Transform

# Subtract year column by 543
df_address_history = df_address_history.withColumn('From Date', add_months(df_address_history['From Date'], -543 * 12))
df_address_history = df_address_history.withColumn('To Date', add_months(df_address_history['To Date'], -543 * 12))

# Convert 'Is Current' column to string
df_address_history = df_address_history.withColumn('Is Current', df_address_history['Is Current'].cast('string'))


root
 |-- Store Number: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zip Code: integer (nullable = true)
 |-- County Number: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- From Date: date (nullable = true)
 |-- To Date: date (nullable = true)
 |-- Is Current: string (nullable = true)
 |-- Address Sequence: integer (nullable = true)



In [None]:
# Export to 'liquor-sales-data/transformed_data/' in ADLS

output_path = "/mnt/liquor-sales-data/transformed_data/store_address_history/"

df_address_history.write.parquet(output_path, mode='overwrite')