In [None]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from delta import *
import pandas as pd
import time


In [None]:
# .master('yarn') to run on the cluster, or .master('local[*]') to run locally
#spark = SparkSession.builder.appName("DataSchema").master('local[*]').getOrCreate()
builder = SparkSession.builder.appName("LiquorSale").master('yarn') \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Disable logging
import logging

logger = spark._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.OFF)
logger.LogManager.getLogger("akka").setLevel(logger.Level.OFF)

spark.conf.set("spark.driver.log.level", "OFF")


In [None]:
# create data schema and enforce schema

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
custom_schema = StructType([
    StructField("Date", StringType(), True),
    StructField("Store", StringType(), True),
    StructField("Address", StringType(), True),
    StructField("City", StringType(), True),
    StructField("Zip", IntegerType(), True),
    StructField("CountyNumber", IntegerType(), True),
    StructField("County", StringType(), True),
    StructField("Category", IntegerType(), True),
    StructField("CategoryName", StringType(), True),
    StructField("VendorNumber", IntegerType(), True),
    StructField("VendorName", StringType(), True),
    StructField("ItemNumber", IntegerType(), True),
    StructField("ItemDescription", StringType(), True),
    StructField("StateBottleCost", DoubleType(), True),
    StructField("StateBottleRetail", DoubleType(), True),
    StructField("BottleSold", IntegerType(), True),
    StructField("SaleDollars", DoubleType(), True),
    StructField("VolumeSoldLiters", DoubleType(), True),
    StructField("VolumeSoldGallons", DoubleType(), True)
])
# Read the CSV file and cache the DataFrame
df = spark.read.csv("/dis_materials/liqour1", header=False, schema=custom_schema).cache()
# Unpersist the DataFrame after using it
df.unpersist()

In [None]:
# DATA CLEANING AND PREPROCESSING 
# Get the number of rows and columns
num_rows = df.count()
num_cols = len(df.columns)
print("There are {:,} rows and {} columns.\n".format(num_rows, num_cols))

# Show descriptive statistics for selected columns
df.select('Date', 'City', 'CategoryName', 'ItemDescription').describe().show()


In [None]:
  
num_rows = df.count()
num_cols = len(df.columns)

# Show descriptive statistics for selected columns
df.select('Date', 'City', 'CategoryName', 'ItemDescription').describe().show()

# Print the results
print("There are {:,} rows and {} columns.\n".format(num_rows, num_cols))


In [None]:
# DATA CLEANING AND PREPROCESSING  (BASELINE CODE)
# How many rows and columns does our data have?  
num_rows = df.count()
num_cols = len(df.columns)
 

# Show descriptive statistics for selected columns
df.select('Date', 'City', 'CategoryName', 'ItemDescription').describe().show()

# Print the results
print("There are {:,} rows and {} columns.\n".format(num_rows, num_cols))


In [None]:
# DATA CLEANING AND PREPROCESSING  (OPTIMIZATION)
# How many rows and columns does our data have?  
num_rows = df.count()
num_cols = len(df.columns)

# Repartition the DataFrame to improve performance
df = df.repartition(4) 

# Show descriptive statistics for selected columns
df.select('Date', 'City', 'CategoryName', 'ItemDescription').describe().show()

# Print the results
print("There are {:,} rows and {} columns.\n".format(num_rows, num_cols))


In [None]:
# City and CategoryName data don't seem to have the same amount of rows as Date and ItemDescription. 
from pyspark.sql.functions import when, sum

# Sum the number of null values in City
city_nulls = df.select(sum(when(df['City'].isNull(), 1).otherwise(0)).alias('City_Nulls')).collect()[0]['City_Nulls']
print(city_nulls)

# Sum the number of null values in CategoryName
catname_nulls = df.select(sum(when(df['CategoryName'].isNull(), 1).otherwise(0)).alias('CategoryName_Nulls')).collect()[0]['CategoryName_Nulls']
print(catname_nulls)

In [None]:
# Date column has a value "Da" so we filter out records with invalid month value
df = df.filter(df['Month'] != 'Da')

In [None]:
#drop all rows with missing values
df=df.na.drop()

In [None]:
# Print top 10 most frequent values in the 'City' column to check if City values are uniform 
#checking the columns values of City and CategoryName (BASELINE CODE)
from pyspark.sql.functions import desc

# Print top 10 most frequent values in the 'City' column
df.groupBy('City') \
     .pivot('City') \
     .count() \
     .orderBy(desc('City')) \
     .limit(10) \
     .show()

# Print a separator line
print('\n' + '-'*40 + '\n')

# Print top 10 most frequent values in the 'Category Name' column
df.groupBy('CategoryName') \
     .pivot('CategoryName') \
     .count() \
     .orderBy(desc('CategoryName')) \
     .limit(10) \
     .show()



In [None]:
# Print top 10 most frequent values in the 'City' column to check if City values are uniform 
#checking the columns values of City and CategoryName (OPTIMIZATION)

from pyspark.sql.functions import countDistinct, desc
df.select('City') \
     .groupBy('City') \
     .agg(countDistinct('City').alias('count')) \
     .orderBy(desc('City')) \
     .limit(10) \
     .show()

# Print a separator line
print('\n' + '-'*40 + '\n')

# Print top 10 most frequent values in the 'Category Name' column
df.select('CategoryName') \
     .groupBy('CategoryName') \
     .agg(countDistinct('CategoryName').alias('count')) \
     .orderBy(desc('CategoryName')) \
     .limit(10) \
     .show()
#great, capitalization  is uniform and no data is listed twice or more

In [None]:
#write the results into spark delta table by removing duplicate
df.write.option("overwriteSchema", "true").mode("overwrite").format("delta").saveAsTable("liqourtable", path="hdfs:///dis_materials/l3")

In [None]:
# IOWA LIQUOR DATA ANALYSIS WITH SPARK DATAFRAME API
# ANALYSIS 1: WHICH TOP 5 CITIES HAS THE MOST LIQUOR PURCHASE 
#(BASELINE CODE)
from pyspark.sql.functions import desc

# Group by city and count number of rows, then cache the data in memory for faster access
df_sales_count = df.groupBy('City').count()

# Order by count in descending order, then take the top 5
top_5_most_sales = df_sales_count.orderBy(desc('count')).limit(5)

top_5_most_sales.show()


In [None]:
# IOWA LIQUOR DATA ANALYSIS WITH SPARK DATAFRAME API
# ANALYSIS 1: WHICH TOP 5 CITIES HAS THE MOST LIQUOR PURCHASE 
# (OPTIMIZATION)
from pyspark.sql.functions import desc

# Group by city and count number of rows, then cache the data in memory for faster access
df_sales_count = df.groupBy('City').count().cache()

# Order by count in descending order, then take the top 5
top_5_most_sales = df_sales_count.orderBy(desc('count')).limit(5)

top_5_most_sales.show()


In [None]:
# ANALYSIS 2: DURING WHICH MONTH IS LQUOR SOLD THE MOST?

from pyspark.sql.functions import count, desc, substring, split

# extract month from 'Date' and store as a new column 'Month'
df = df.withColumn('Month', substring(split('Date', '/')[0], 1, 2))

# group by month and count the total sales in each month, and sort results in  descending order
top_months = df.groupBy('Month').agg(count('*').alias('total_sales')).orderBy(desc('total_sales'))
top_months.show()
# NB. there are no records for month 7 and month 8 in our dataset 

In [None]:
# ANALYSIS 2: visualizing the sales by Month with a line plot
from pyspark.sql.functions import count, desc, substring, split
import pandas as pd
import matplotlib.pyplot as plt

# extract month from date and store as a new column 'Month'
df = df.withColumn('Month', substring(split('Date', '/')[0], 1, 2))

# filter out records with invalid month value 'Da'
df = df.filter(df['Month'] != 'Da')

# group by month and count the total sales in each month, and sort in ascending order
top_months = df.groupBy('Month').agg(count('*').alias('total_sales')).orderBy('Month')

# collect the data as a list of Row objects
data = top_months.collect()

# convert to Pandas DataFrame for plotting
sales_by_month_pd = pd.DataFrame(data, columns=['Month', 'total_sales'])

# create line plot of sales by month
plt.plot(sales_by_month_pd['Month'], sales_by_month_pd['total_sales'], linewidth=2.0)


In [56]:
# ANALYSIS 3 (BASELINE CODE): WHICH 10 BRANDS AND TOP 10 LIQUOR TYPES ARE MOST POPULAR IN IOWA
from pyspark.sql.functions import desc

# Get top 10 brands by sales
top_brands = df.groupBy('ItemDescription').count().orderBy(desc('count')).limit(10).toPandas()

# Get top 10 categories by sales
top_types = df.groupBy('CategoryName').count().orderBy(desc('count')).limit(10).toPandas()

print(top_brands)
print('\n' + '-'*40 + '\n')
print(top_types)




                  ItemDescription   count
0            TITOS HANDMADE VODKA  133532
1       FIREBALL CINNAMON WHISKEY  120427
2                    BLACK VELVET  115621
3                   HAWKEYE VODKA  103157
4  CAPTAIN MORGAN ORIGINAL SPICED   77107
5                     CROWN ROYAL   68596
6         CROWN ROYAL REGAL APPLE   67859
7                  SMIRNOFF 80PRF   55944
8                SEAGRAMS 7 CROWN   53261
9                        JIM BEAM   53155

----------------------------------------

                CategoryName   count
0            AMERICAN VODKAS  811788
1          CANADIAN WHISKIES  481656
2  STRAIGHT BOURBON WHISKIES  393641
3            WHISKEY LIQUEUR  299874
4    AMERICAN FLAVORED VODKA  252559
5                 SPICED RUM  226778
6           BLENDED WHISKIES  222259
7         100% AGAVE TEQUILA  192765
8          AMERICAN SCHNAPPS  172338
9             COCKTAILS /RTD  171029


                                                                                

In [57]:
# ANALYSIS 3 (OPTIMIZATION): WHICH 10 BRANDS AND TOP 10 LIQUOR TYPES ARE MOST POPULAR IN IOWA
# Repartition the DataFrame before calling groupBy function
df_repartitioned = df.repartition('CategoryName', 'ItemDescription')

# Get top 10 brands by sales
top_brands = df_repartitioned.groupBy('ItemDescription').count().orderBy(desc('count')).limit(10).toPandas()

# Get top 10 categories by sales
top_types = df_repartitioned.groupBy('CategoryName').count().orderBy(desc('count')).limit(10).toPandas()

print(top_brands)
print('\n' + '-'*40 + '\n')
print(top_types)


[Stage 221:>                                                        (0 + 2) / 2]

                  ItemDescription   count
0            TITOS HANDMADE VODKA  133532
1       FIREBALL CINNAMON WHISKEY  120427
2                    BLACK VELVET  115621
3                   HAWKEYE VODKA  103157
4  CAPTAIN MORGAN ORIGINAL SPICED   77107
5                     CROWN ROYAL   68596
6         CROWN ROYAL REGAL APPLE   67859
7                  SMIRNOFF 80PRF   55944
8                SEAGRAMS 7 CROWN   53261
9                        JIM BEAM   53155

----------------------------------------

                CategoryName   count
0            AMERICAN VODKAS  811788
1          CANADIAN WHISKIES  481656
2  STRAIGHT BOURBON WHISKIES  393641
3            WHISKEY LIQUEUR  299874
4    AMERICAN FLAVORED VODKA  252559
5                 SPICED RUM  226778
6           BLENDED WHISKIES  222259
7         100% AGAVE TEQUILA  192765
8          AMERICAN SCHNAPPS  172338
9             COCKTAILS /RTD  171029


                                                                                

In [None]:
#Analysis 3: visualizing the top 10 Brand by Sale and the top 10 Liquor Types by Sale with a bar chart
import matplotlib.pyplot as plt

# Create bar chart for top 10 brands
plt.bar(top_brands['ItemDescription'], top_brands['count'])
plt.xticks(rotation=90)
plt.xlabel('Brand')
plt.ylabel('Number of Sales')
plt.title('Top 10 Brands by Sales')
plt.show()

# Create bar chart for top 10 liquor types
plt.bar(top_types['CategoryName'], top_types['count'])
plt.xticks(rotation=90)
plt.xlabel('Liquor Type')
plt.ylabel('Number of Sales')
plt.title('Top 10 Liquor Types by Sales')
plt.show()


In [None]:
#ANALYSIS 4: DO SOME CITIES PREFER CERTAIN LIQUORS OVER OTHERS?
# We will now analyze the drinking preferences of the residents in Ames and Iowa City, 
#which are two of the largest college towns in Iowa Specifically,
# we will determine the most popular type of liquor in each city based on the available data.

from pyspark.sql.functions import count

# group by CategoryName and count the sales in AMES
top_type_ames = df.filter(df['City'] == 'AMES') \
                  .groupBy('CategoryName') \
                  .agg(count('*').alias('sales_count')) \
                  .orderBy('sales_count', ascending=False) \
                  .limit(4) \
                  .toPandas()

# group by Category Name and count the sales in IOWA CITY
top_type_ia_city = df.filter(df['City'] == 'IOWA CITY') \
                     .groupBy('CategoryName') \
                     .agg(count('*').alias('sales_count')) \
                     .orderBy('sales_count', ascending=False) \
                     .limit(4) \
                     .toPandas()

print(top_type_ames.head(4))
print('\n' + '-'*40 + '\n')
print(top_type_ia_city.head(4))




# Fascinating! The top four preferred types of alcoholic beverages are common in both cities, although the order of preference varies. 
# However, th ouputs suggests that both Ames and Iowa City residents prefer vodka more than other alcoholic beverages 