# SparkSQL Script
Author: Evan Bariquit

1. Time to count the occurences of each element in the file
2. Time to alphabetize the elements
2. Time to total the amounts for each transCat

In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import trim
from pyspark.sql.functions import sum
from pyspark.sql.types import IntegerType
import timeit


# Create Spark Session.
sparkContext = SparkContext()
spark = SparkSession(sparkContext)


In [2]:
# Import dataset.
# 'header = True' b/c our file has column headers
upload_startTime = timeit.default_timer()
datalog = spark.read.load("Data/datalog2.csv", format="csv", header=True, inferSchema=True)
upload_stopTime = timeit.default_timer()

# Remove leading/trailing whitespaces
for col_name in datalog.columns:
    datalog = datalog.withColumn(col_name, trim(col(col_name)))

# Spark interprets the schema of these columns as strings; convert to integers.
datalog = datalog.withColumn("distance", col("distance").cast(IntegerType()))
datalog = datalog.withColumn("amount", col("amount").cast(IntegerType()))

In [3]:
# For viewing a sample of the data
datalog.show()
print("Time to load the dataset into a dataframe:", upload_stopTime - upload_startTime)

+-------+-----------+-----+--------+-------------+-------------+------+
| vendor|       city|state|distance|     transCat|    transType|amount|
+-------+-----------+-----+--------+-------------+-------------+------+
|    QFC|   Kirkland|   WA|       8|wire transfer|    gift card|   160|
|Safeway|    Seattle|   WA|      20|    withdrawl|          ATM|   118|
|Safeway|Woodinville|   WA|       0|      grocery|personal care|     6|
|    QFC|   Kirkland|   WA|       8|  electronics|           TV|  4647|
|Safeway|Woodinville|   WA|       0|   restaurant|        steak|   305|
|Safeway|Woodinville|   WA|       0|   restaurant|      dim sum|   156|
| Amazon|     Online|   NA|       0|     clothing|        shoes|   154|
| Amazon|     Online|   NA|       0|  electronics|       stereo|  2520|
| Amazon|     Online|   NA|       0|   restaurant|    fast food|    88|
|Safeway|    Seattle|   WA|      20|      grocery|         food|  1766|
|    QFC|   Kirkland|   WA|       8|      grocery|personal care|

## Counting frequencies of each element

Implementation:
    1. Number of occurences will be stored in the 'counts' spark dataframe.
    2. Parse each column of the data file.
        a. get occurences of elements within column.
        b. store occurences in 'counts'
        NOTE: this creates duplicates!
        eg: 'QFC' appears in 'vendor' and 'transType' columns.
            We will have two counts for QFC (one for its occurences 
            in vendor, the other for occurences in transType).
    3. Condense duplicates.

In [4]:
# Where we will store the counts per element
counts = spark.createDataFrame([('test',0)],['element','count'])

# Begin.
counting_startTime = timeit.default_timer()

for column in datalog.columns:
    subset = datalog.groupBy(column).count().withColumnRenamed(column, 'element')
    # NOTE:
        # (Distance/Amount)
        # Do we want to count occurences for these columns? 
        # They aren't counted in Stephen's implementation, 
        # so skipping them here for consistency.
    if column != 'distance' and column != 'amount':
        counts = counts.unionByName(subset)

# Condenses duplicates.
counts = counts.groupBy('element').agg(sum('count').alias('count'))

# Finished.
counting_stopTime = timeit.default_timer()
counts.show()
print("Time to count frequencies:", counting_stopTime - counting_startTime)

+-----------+--------+
|    element|   count|
+-----------+--------+
|      shoes|  770109|
|      shirt|  770635|
|         TV| 1415465|
|        ATM|  531296|
|     stereo| 1412284|
|         NA| 5877587|
|        QFC| 6211060|
|       BANK|  530508|
|Wells Fargo|  339341|
|       food| 4239187|
| cell phone| 1413560|
| Fred Meyer|  338942|
|    Safeway|12084417|
|  withdrawl| 1061804|
|      dress|  770975|
|   Kirkland| 5871642|
|    dim sum|  423326|
|      jeans|  770263|
|         WA|17616654|
|      movie|  605755|
+-----------+--------+
only showing top 20 rows

Time to count frequencies: 0.10414709999999872


## Sorting the above dataframe

In [5]:
# Begin.
sorting_startTime = timeit.default_timer()

counts = counts.sort("element", ascending=True)

# Finished.
sorting_stopTime = timeit.default_timer()
counts.show()
print("Time to sort:", sorting_stopTime - sorting_startTime)

+-----------+--------+
|    element|   count|
+-----------+--------+
|        ATM|  531296|
|     Amazon| 5877587|
|       BANK|  530508|
| Fred Meyer|  338942|
|   Kirkland| 5871642|
|         NA| 5877587|
|     Online| 5877587|
|        QFC| 6211060|
|    Safeway|12084417|
|    Seattle| 5875077|
|         TV| 1415465|
|         WA|17616654|
|Wells Fargo|  339341|
|Woodinville| 5869935|
|    bowling|  604942|
| cell phone| 1413560|
|   clothing| 3081982|
|   computer| 1415455|
|    dim sum|  423326|
|     dining|  605768|
+-----------+--------+
only showing top 20 rows

Time to sort: 0.03509410000000912


## Total the amounts per transCat

In [6]:
# Begin.
amounts_startTime = timeit.default_timer()

totalByCat = datalog.groupBy('transCat').agg(sum('amount').alias('total'))

# Finished.
amounts_stopTime = timeit.default_timer()
totalByCat.show()
print("Time to total the amounts spent by transCat:", amounts_stopTime - amounts_startTime)

+-------------+-----------+
|     transCat|      total|
+-------------+-----------+
|    withdrawl|  164560030|
|      grocery| 4685652942|
|  electronics|10182284690|
|     clothing| 1472117693|
|entertainment|  541594417|
|wire transfer| 6883671707|
|   restaurant|  424038841|
+-------------+-----------+

Time to total the amounts spent by transCat: 0.03242509999998333
