In [1]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from datetime import date, timedelta, datetime
from pyspark.sql.window import Window

from IPython.display import display

In [3]:
def disp(df, n=None):
    if n is not None:
        display(df.toPandas().head(n))
    else:
        display(df.toPandas())

In [4]:
spark = SparkSession.builder.appName("mymain").master("local[*]").getOrCreate()
# spark = SparkSession.builder.appName("mymain").getOrCreate()

## First Check

- Check schema
- Check malformed data
- Check Numbers - set schema explicitly 

In [6]:
order_schema = StructType([
    StructField("index", StringType(), True),
    StructField("order_id",StringType(),True),
    StructField("sellout_date",StringType(),True),
    StructField("consumer_id",StringType(),True),
    StructField("quantity",IntegerType(),True),
    StructField("net_value_dkk",DoubleType(),True),
    StructField("store_id",StringType(),True),
    StructField("store_type",StringType(),True),
    StructField("product_id_with_size",StringType(),True),
    StructField("malformed_rows",StringType(),True)
])

order = (spark.read
  .format("csv")
  .option("mode", "PERMISSIVE")
  .option("columnNameOfCorruptRecord", "malformed_rows")
  .option('encoding', 'utf-8')
  .option('lineSep', '\n')
  .schema(order_schema)
  .option("header", "true")
  # .option("inferSchema", True)
  .load("./Data/order.csv")
)

disp(order.filter("malformed_rows is not null"))

product_schema = StructType([
    StructField("index", StringType(), True),
    StructField("product_id_with_size",StringType(),True),
    StructField("product_id",StringType(),True),
    StructField("category",StringType(),True),
    StructField("collection",StringType(),True),
    StructField("color",StringType(),True),
    StructField("metal",StringType(),True),
    StructField("theme",StringType(),True),
    StructField("malformed_rows",StringType(),True)
])

product = (spark.read
  .format("csv")
  .option("mode", "PERMISSIVE")
  .option("columnNameOfCorruptRecord", "malformed_rows")
  .option('encoding', 'utf-8')
  .option('lineSep', '\n')
  .schema(product_schema)
  .option("header", "true")
  # .option("inferSchema", True)
  .load("./Data/product.csv")
)

disp(product.filter("malformed_rows is not null"))

Unnamed: 0,index,order_id,sellout_date,consumer_id,quantity,net_value_dkk,store_id,store_type,product_id_with_size,malformed_rows


Unnamed: 0,index,product_id_with_size,product_id,category,collection,color,metal,theme,malformed_rows


## General Checks

- Check dates
- Check uniqueness
- Check missings

In [5]:
orders_schema = StructType([
    StructField("index", StringType(), True),
    StructField("order_id",StringType(),True),
    StructField("sellout_date",StringType(),True),
    StructField("consumer_id",StringType(),True),
    StructField("quantity",IntegerType(),True),
    StructField("net_value_dkk",DoubleType(),True),
    StructField("store_id",StringType(),True),
    StructField("store_type",StringType(),True),
    StructField("product_id_with_size",StringType(),True),
])

orders = (spark.read
  .format("csv")
  .option("mode", "FAILFAST")
  .option('encoding', 'utf-8')
  .option('lineSep', '\n')
  .schema(orders_schema)
  .option("header", "true")
  # .option("inferSchema", True)
  .load("./Data/order.csv")
)
orders = orders.drop("index")

products_schema = StructType([
    StructField("index", StringType(), True),
    StructField("product_id_with_size",StringType(),True),
    StructField("product_id",StringType(),True),
    StructField("category",StringType(),True),
    StructField("collection",StringType(),True),
    StructField("color",StringType(),True),
    StructField("metal",StringType(),True),
    StructField("theme",StringType(),True),
])

products = (spark.read
  .format("csv")
  .option("mode", "FAILFAST")
  .option('encoding', 'utf-8')
  .option('lineSep', '\n')
  .schema(products_schema)
  .option("header", "true")
  # .option("inferSchema", True)
  .load("./Data/product.csv")
)
products = products.drop("index")

In [8]:
disp(
    orders
    .withColumn(
        "sellout_date_is_valid_date", 
        F.when(F.to_date("sellout_date", "yyyy-MM-dd").isNotNull(), True)
    )
    .groupBy("sellout_date_is_valid_date").count()
)

Unnamed: 0,sellout_date_is_valid_date,count
0,True,916074
