In [457]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DoubleType, FloatType, DecimalType, DateType, LongType
from pyspark.sql.functions import to_date,col
from pyspark import StorageLevel

In [458]:
spark = SparkSession.builder.appName("etl_project").enableHiveSupport().getOrCreate()

In [459]:
columns_mapping= {    
'Invoice/Item Number':'invoice_item_number',
 'Date':'date',
 'Store Number':'store_number',
 'Store Name':'store_name',
 'Address':'address',
 'City':'city',
 'Zip Code':'zip_Code' ,
 'Store Location':'store_location',
 'County Number': 'county_number',
 'County':'county',
 'Category':'category',
 'Category Name':'category_name',
 'Vendor Number':'vendor_number',
 'Vendor Name':'vendor_name',
 'Item Number': 'item_number',
 'Item Description':'item_description',
 'Pack': 'pack',
 'Bottle Volume (ml)': 'bottle_volume_ml',
 'State Bottle Cost':'state_bottle_cost',
 'State Bottle Retail':'state_bottle_retail',
 'Bottles Sold':'bottles_sold',
 'Sale (Dollars)':'sale_usd',
 'Volume Sold (Liters)': 'volume_sold_liters',
 'Volume Sold (Gallons)': 'volume_sold_gallons'
}



In [460]:
schema = [
    StructField('Invoice/Item Number', StringType(), True),
    StructField('Date', StringType(), True),
    StructField('Store Number', IntegerType(), True),
    StructField('Store Name', StringType(), True),
    StructField('Address', StringType(), True),
    StructField('City', StringType(), True),
    StructField('Zip Code', StringType(), True),
    StructField('Store Location', StringType(), True),
    StructField('County Number', IntegerType(), True),
    StructField('County', StringType(), True),
    StructField('Category', IntegerType(), True),
    StructField('Category Name', StringType(), True),
    StructField('Vendor Number', IntegerType(), True),
    StructField('Vendor Name', StringType(), True),
    StructField('Item Number', IntegerType(), True),
    StructField('Item Description', StringType(), True),
    StructField('Pack', IntegerType(), True),
    StructField('Bottle Volume (ml)', IntegerType(), True),
    StructField('State Bottle Cost', StringType(), True),
    StructField('State Bottle Retail', StringType(), True),
    StructField('Bottles Sold', IntegerType(), True),
    StructField('Sale (Dollars)', StringType(), True),
    StructField('Volume Sold (Liters)', DoubleType(), True),
    StructField("Volume Sold (Gallons)", DoubleType(), True)
]

In [461]:
end_schema = StructType(fields = schema)

In [462]:
df = spark.read.option("header", "true")\
               .option("multiLine", "true")\
               .schema(end_schema)\
               .csv("Iowa_Liquor_Sales__0.csv")\
               .withColumn("date_fixed",to_date(col("Date"), 'MM/dd/yyyy'))             



df = df.select([col(c).alias(columns_mapping.get(c, c)) for c in df.columns])

In [463]:
df.printSchema()

root
 |-- invoice_item_number: string (nullable = true)
 |-- date: string (nullable = true)
 |-- store_number: integer (nullable = true)
 |-- store_name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- zip_Code: string (nullable = true)
 |-- store_location: string (nullable = true)
 |-- county_number: integer (nullable = true)
 |-- county: string (nullable = true)
 |-- category: integer (nullable = true)
 |-- category_name: string (nullable = true)
 |-- vendor_number: integer (nullable = true)
 |-- vendor_name: string (nullable = true)
 |-- item_number: integer (nullable = true)
 |-- item_description: string (nullable = true)
 |-- pack: integer (nullable = true)
 |-- bottle_volume_ml: integer (nullable = true)
 |-- state_bottle_cost: string (nullable = true)
 |-- state_bottle_retail: string (nullable = true)
 |-- bottles_sold: integer (nullable = true)
 |-- sale_usd: string (nullable = true)
 |-- volume_sold_liters: double (nulla

In [464]:
df = df.drop("date", "store_location")

In [465]:
df = df.na.fill(-1, subset =["category"])

In [466]:
df.select('category').show()

+--------+
|category|
+--------+
|      -1|
|      -1|
|      -1|
|      -1|
|      -1|
|      -1|
| 1701100|
|      -1|
| 1701100|
|      -1|
|      -1|
|      -1|
|      -1|
| 1701100|
|      -1|
|      -1|
|      -1|
|      -1|
| 1081200|
|      -1|
+--------+
only showing top 20 rows



In [467]:
df = df.na.fill('no_category', subset=["category_name"])

In [468]:
df.select('category_name').show()

+--------------------+
|       category_name|
+--------------------+
|         no_category|
|         no_category|
|         no_category|
|         no_category|
|         no_category|
|         no_category|
|DECANTERS & SPECI...|
|         no_category|
|DECANTERS & SPECI...|
|         no_category|
|         no_category|
|         no_category|
|         no_category|
|DECANTERS & SPECI...|
|         no_category|
|         no_category|
|         no_category|
|         no_category|
|      CREAM LIQUEURS|
|         no_category|
+--------------------+
only showing top 20 rows



In [469]:
#new_df.persist(StorageLevel.MEMORY_AND_DISK)

In [470]:
df.createOrReplaceTempView("ILS")

In [471]:
db_name = "ILS_db"
dimensions = ["fact_Sales","dim_Category", "dim_Store", "dim_Vendor", "dim_Items", "dim_County", "dim_Dates"]

In [472]:
sparkDF = spark.sql("select distinct category, category_name from ILS")

In [473]:
spark.sql(f'CREATE database if not exists {db_name}')

DataFrame[]

In [None]:
spark.sql(f'''CREATE TABLE if not exists {db_name}.dim_Category (

          id INT,
          category_name STRING
          
         );''')

In [None]:
spark.sql('''            
            INSERT INTO dim_Category
            select distinct category, category_name from ILS         
         );''')