## This notebook performs the initial filtering on our two main sources containing data on the "Inventory" and "Checkout" data for the Seattle Public Library data set. These two CSVs were originally over ~20 GB and were reduced to ~1 GB parquet files and published on S3.

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from itertools import islice
from datetime import datetime
from pyspark.sql.functions import desc, col, udf, max as max_
from pyspark.sql import SQLContext
import re

# Instantiate spark context and spark session
sc = SparkContext.getOrCreate()
ss = SparkSession.builder.getOrCreate()

### Define schema and helper functions.

In [2]:
# Instantiate spark context and spark session
sc = SparkContext.getOrCreate()
ss = SparkSession.builder.getOrCreate()

# Define schemas
inventory_schema = StructType([
    StructField("BibNum", IntegerType(), True),
    StructField("PublicationYear", IntegerType(), True),
    StructField("Author", StringType(), True),
    StructField("Publisher", StringType(), True),
    StructField("ItemType", StringType(), True),
    StructField("ItemCollection", StringType(), True),
    StructField("ItemLocation", StringType(), True),
    StructField("ItemCount", IntegerType(), True),
    StructField("ReportDate", DateType(), True)])

checkout_schema = StructType([StructField("BibNumber", IntegerType(), True),
                              StructField("CheckoutDateTime", DateType(), True)])

# Define helper functions
def tryInt(val):
    try:
        return int(val)
    except:
        return 
    
def itemCount(val):
    try:
        return int(val)
    except:
        return None
    
def tryReplace(val):
    try:
        val = int(re.sub('\D', '', val))
        assert len(str(val)) == 4
        return val
    except:
        return None
    
def toTimeSafe(inval):
    date = inval.split(' ')[0]
    try:
        return datetime.strptime(date, '%m/%d/%Y').date()
    except ValueError:
        return None
    
def toTimeSafe2(date):
    if date != None: 
        try:
            return datetime.strptime(date, '%m/%d/%Y').date()
        except ValueError:
            return None
    else:
        return None

### Create inventory table.

In [3]:
# Read in df
tmp_inventory_df = ss.read.csv('Library_Collection_Inventory.csv', header=True)

In [4]:
# Generating function for MapPartitions
def drop_collection(lst):
    for x in lst:
        yield (tryInt(x[0]), tryReplace(x[4]), x[2], x[5],
               x[7], x[8], x[10], itemCount(x[-1]), toTimeSafe2(x[-2]))
        
# Convert dataframe to rdd and filter
library_rdd = tmp_inventory_df.rdd.mapPartitions(drop_collection)

# Convert back to final df
inventory_df = ss.createDataFrame(library_rdd, schema=inventory_schema)

### Create checkout table.

In [5]:
# Read in df
tmp_checkout_df = ss.read.csv('checkouts_filt.csv', header=True)

In [6]:
# Convert dataframe to rdd and filter
checkout_rdd = tmp_checkout_df.rdd
checkout_rdd = checkout_rdd.map(lambda x: (x[2],x[-1]))

# Convert back to final df
checkout = checkout_rdd.map(lambda x: (tryInt(x[0]), toTimeSafe(x[1])))
checkout_df = ss.createDataFrame(checkout, checkout_schema)

### Look at both of the tables.

In [7]:
inventory_df.show()
checkout_df.show()

+-------+---------------+--------------------+--------------------+--------+--------------+------------+---------+----------+
| BibNum|PublicationYear|              Author|           Publisher|ItemType|ItemCollection|ItemLocation|ItemCount|ReportDate|
+-------+---------------+--------------------+--------------------+--------+--------------+------------+---------+----------+
|2731930|           2011|MacLachlan, Patricia|Atheneum Books fo...|    jcbk|         ncfic|         gwd|        1|2019-02-01|
|2983336|           2014|O'Connell, Robert L.|       Random House,|    acbk|           nab|         bea|        1|2019-02-01|
|3343596|           2013|                null|      Kino Classics,|   acdvd|         nadvd|         qna|        1|2019-02-01|
|2715824|           2011|Thrasher, Travis,...|        Oasis Audio,|    accd|        cabocd|         cen|        1|2019-02-01|
|3331687|           2017|                null|          Kingswell,|    acbk|          nanf|         gwd|        1|2019

### Write the tables to .parquet.

In [8]:
# inventory_df.write.parquet('Inventory')
# checkout_df.write.parquet('Checkouts')

### Read the parquet files and verify they worked.

In [9]:
# sqlContext = SQLContext(sc)
# inventory_df = sqlContext.read.parquet('Inventory')
# checkout_df = sqlContext.read.parquet('checkout')

### Push to S3.

In [10]:
# ! aws s3 cp Inventory s3://intersession-distcomp/Inventory --recursive
# ! aws s3 cp Checkouts s3://intersession-distcomp/Checkouts --recursive