In [1]:
import pyspark
import pyspark.sql.types as stypes
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, LongType
from pyspark.sql.functions import pandas_udf

import numpy as np
import pandas as pd

In [2]:
spark = pyspark.sql.SparkSession.builder.master("local").getOrCreate()
spark

# Load data

In [3]:
def save_parquet(filename, df, host="hdfs://ca4022-m"):
    df.write.save(host + filename, format="parquet", mode="overwrite")
    
def read_parquet(filename, host="hdfs://ca4022-m"):
    return spark.read.load(host + filename)

In [None]:
# hdfs://HOST/path_to_file
file = "hdfs://ca4022-m/user/adam/input/Iowa_Liquor_Sales.csv"
df = spark.read.format("csv") \
    .option("inferSchema", True) \
    .option("header", True) \
    .option("multiLine", True).load(file)#.toDF(*columns)

In [5]:
df.show(1, truncate=False, vertical=True)

-RECORD 0--------------------------------------------------------------
 Invoice/Item Number   | S29198800001                                  
 Date                  | 11/20/2015                                    
 Store Number          | 2191                                          
 Store Name            | Keokuk Spirits                                
 Address               | 1013 MAIN                                     
 City                  | KEOKUK                                        
 Zip Code              | 52632                                         
 Store Location        | 1013 MAIN
KEOKUK 52632
(40.39978, -91.387531) 
 County Number         | 56                                            
 County                | Lee                                           
 Category              | null                                          
 Category Name         | null                                          
 Vendor Number         | 255                                    

# Fix column names

In [8]:
df2 = df
for col in df.columns:
    df2 = df2.withColumnRenamed(col, 
                                col.replace(" ", "_") # Replace spaces
                                .replace("(", "") # Remove brackets
                                .replace(")", ""))

# Save as parquet format

In [9]:
save_parquet("/user/adam/parquet/iowa", df2)

In [4]:
df = read_parquet("/user/adam/parquet/iowa")

# Data cleaning

In [5]:
@pandas_udf("string")
def clean_location(s: pd.Series) -> pd.Series:
    # Remove newlines
    return s.str.replace("\n", " ")

@pandas_udf("float")
def get_lat(s: pd.Series) -> pd.Series:
    latlong = s.str.extract(r"\(([-]?[\d\.]+), ([-]?[\d\.]+)\)")
    return latlong[0].astype(float)

@pandas_udf("float")
def get_lon(s: pd.Series) -> pd.Series:
    latlong = s.str.extract(r"\(([-]?[\d\.]+), ([-]?[\d\.]+)\)")
    return latlong[1].astype(float)

@pandas_udf("float")
def parse_dollar(s: pd.Series) -> pd.Series:
    # Remove dollar sign
    return s.str.slice(start=1).astype(float)

- Extract location coordinates
- Parse date
- Parse prices
- Fix newlines in location
- Split invoice & item numbers

In [6]:
df2 = df.withColumn("lat", get_lat("Store_Location")) \
    .withColumn("lon", get_lon("Store_Location")) \
    .withColumn("Date", F.to_date("Date", format="M/d/y")) \
    .withColumn("State_Bottle_Cost", parse_dollar("State_Bottle_Cost")) \
    .withColumn("State_Bottle_Retail", parse_dollar("State_Bottle_Retail")) \
    .withColumn("Sale_Dollars", parse_dollar("Sale_Dollars")) \
    .withColumn("Store_Location", clean_location("Store_Location")) \
    .withColumn("invoice_number", F.col("Invoice/Item_Number").substr(0, 6)) \
    .withColumn("line_number", F.col("Invoice/Item_Number").substr(7, 6)) \
    .drop("Invoice/Item_Number", "Store_Location")

In [12]:
df2.show(1, truncate=False, vertical=True)

-RECORD 0-------------------------------------------------------
 Date                | 2012-10-24                               
 Store_Number        | 2648                                     
 Store_Name          | Hy-Vee #4 / WDM                          
 Address             | 555 S 51ST ST                            
 City                | WEST DES MOINES                          
 Zip_Code            | 50265                                    
 County_Number       | 77                                       
 County              | Polk                                     
 Category            | 1081390                                  
 Category_Name       | IMPORTED SCHNAPPS                        
 Vendor_Number       | 260                                      
 Vendor_Name         | Diageo Americas                          
 Item_Number         | 69945                                    
 Item_Description    | Rumple Minze Peppermint Schnapps Liqueur 
 Pack                | 12

In [7]:
save_parquet("/user/adam/parquet/cleaned", df2)

In [4]:
df2 = read_parquet("/user/adam/parquet/cleaned")

# More cleaning
- Split up the tables into their most simple groups  
- Clean each of the tables by finding the modal value for each group
- Combine the tables again

In [9]:
def mode(df, group_col, val_col):
    """
    Calculate the modal value for a group
    """
    
    if type(group_col) == str:
        group_col = [group_col]
    
    
    # Count unique values
    counts = df.groupby(*group_col, val_col).count().alias("count")
    
    # Select the maximum from each group
    return counts.groupBy(*group_col) \
        .agg(F.max(F.struct("count", val_col)).alias("max")) \
        .select(*group_col, f"max.{val_col}")

@pandas_udf("float")
def median(s: pd.Series) -> float:
    return s.quantile(0.5)

In [10]:
store_names = mode(df2, "Store_Number", "Store_Name")
vendor_names = mode(df2, "Vendor_Number", "Vendor_Name")
items = mode(df2, "Item_Number", "Item_Description")
county_names = mode(df2, "County_Number", "County")
store_locations = mode(df2, ["Store_Number", "Zip_Code"], "Address")
categories = mode(df2, "Category", "Category_Name")

zip_county = mode(df2, "Zip_Code", "County_Number")
zip_city = mode(df2, "Zip_Code", "City")
zip_codes = zip_county.join(zip_city, "Zip_Code", "inner")

gps = df2.groupby("Store_Number", "Zip_Code").agg(
    median("lat").alias("median_lat"),
    median("lon").alias("median_lon"),
)

In [11]:
df_raw = df2.select(
    "Date", 
    "Store_Number", 
    "Zip_Code",
    "Category",
    "Vendor_Number", 
    "Item_Number",
    "Pack",
    "Bottle_Volume_ml", 
    'State_Bottle_Cost',
    'State_Bottle_Retail',
    'Bottles_Sold',
    'Sale_Dollars',
    'Volume_Sold_Liters',
    'Volume_Sold_Gallons',
    'invoice_number',
    'line_number'
)

df_joined = df_raw.join(store_names, "Store_Number", how="left") \
    .join(store_locations, ["Store_Number", "Zip_Code"], how="left") \
    .join(gps, ["Store_Number", "Zip_Code"], how="left") \
    .join(zip_codes, "Zip_Code", how="left") \
    .join(county_names, on="County_Number", how="left") \
    .join(vendor_names, on="Vendor_Number", how="left") \
    .join(items, on="Item_Number", how="left") \
    .join(categories, on="Category", how="left")

In [13]:
df_joined.show(1, truncate=False, vertical=True)

-RECORD 0---------------------------------------
 Category            | 1022200                  
 Item_Number         | 87849                    
 Vendor_Number       | 85                       
 County_Number       | 31                       
 Zip_Code            | 52002                    
 Store_Number        | 2624                     
 Date                | 2017-10-10               
 Pack                | 6                        
 Bottle_Volume_ml    | 750                      
 State_Bottle_Cost   | 23.99                    
 State_Bottle_Retail | 35.99                    
 Bottles_Sold        | 2                        
 Sale_Dollars        | 35.99                    
 Volume_Sold_Liters  | 1.5                      
 Volume_Sold_Gallons | 0.39                     
 invoice_number      | INV-07                   
 line_number         | 863000                   
 Store_Name          | Hy-Vee #2 / Dubuque      
 Address             | 2395 NW ARTERIAL RD      
 median_lat         

In [None]:
save_parquet("/user/adam/parquet/cleaned2", df_joined)

In [5]:
df_joined = read_parquet("/user/adam/parquet/cleaned2")

# Comparison

In [6]:
def count_null(df):
    null_counts = []
    for i, c in enumerate(df.columns):
        print("\r", i+1, "/", len(df.columns), end="", flush=True)
        null_counts.append({
            "column": c,
            "nulls": df.filter(F.col(c).isNull()).count()
        })
        
    print()
        
    return pd.DataFrame(null_counts).set_index("column").sort_values("nulls", ascending=False)

In [7]:
df2_null = count_null(df2)
df_joined_null = count_null(df_joined)

 26 / 26
 26 / 26


In [8]:
counts = pd.concat([
    df2_null, 
    df_joined_null.rename(index={"median_lat": "lat", "median_lon": "lon"})
], axis=1)
counts.columns = ["Nulls Before Cleaning", "Nulls After Cleaning"]
counts

Unnamed: 0,Nulls Before Cleaning,Nulls After Cleaning
lon,941306,213676
lat,941306,213676
County_Number,79178,2420
County,79178,2420
Category_Name,16086,9024
Category,8020,8020
Zip_Code,2420,2420
Address,2376,2421
City,2375,2420
State_Bottle_Cost,10,10
