## Using tsv file and DataFrame + Schema manipulation
Using the DataFrameReader that is associated with our SparkSession.

In [2]:
import findspark

from IPython.display import display
from pyspark.sql import SparkSession

findspark.init("/usr/local/spark-2.3.0-bin-hadoop2.7/")

spark = SparkSession.builder.appName("basics").getOrCreate()
test_file = "/Users/asapehrsson/dev/learn/intro_to_hadoop_and_mapreduce/data/purchases_sample.tsv"

import pyspark.sql.functions as func
from pyspark.sql.types import *

# Create a schema
# https://databricks.com/blog/2017/06/13/five-spark-sql-utility-functions-extract-explore-complex-data-types.html

schema = StructType() \
    .add("date", DateType(), True) \
    .add("time", StringType(), True) \
    .add("store", StringType(), True) \
    .add("item", StringType(), True) \
    .add("cost", FloatType(), True) \
    .add("payment", StringType(), True)

ds = spark.read \
    .csv(test_file, schema=schema, sep="\t")

ds.printSchema()

root
 |-- date: date (nullable = true)
 |-- time: string (nullable = true)
 |-- store: string (nullable = true)
 |-- item: string (nullable = true)
 |-- cost: float (nullable = true)
 |-- payment: string (nullable = true)



### Calculate total sales per store. Sort by total sales. Show a few.
sort() and orderBy() is aliases. 

Inner workings of sort is described here:
https://stackoverflow.com/questions/32887595/how-does-spark-achieve-sort-order

In [4]:
#df.show()
#df.groupBy("store").count().show()

ds.groupBy("store") \
    .agg(func.sum("cost") \
             .alias("sum_column_name")) \
    .sort("sum_column_name") \
    .show(n = 5)


+----------+------------------+
|     store|   sum_column_name|
+----------+------------------+
|   Madison|44.470001220703125|
|     Tampa| 53.18000030517578|
|     Miami|             53.25|
|   Hialeah|115.20999908447266|
|Birmingham|118.04000091552734|
+----------+------------------+
only showing top 5 rows



### Calculate total sales per store. Orter by total sales. take() returns a Array().

In [5]:
from IPython.display import HTML, display
import tabulate

test_data = ds.groupBy("store") \
    .agg(func.sum("cost") \
             .alias("sum_column_name")) \
    .sort("sum_column_name", ascending=False) \
  .take(3)


display(HTML(tabulate.tabulate(test_data, tablefmt='html')))

0,1
Chula Vista,1118.65
Portland,1037.1
Gilbert,969.41


In [14]:
ds.sort("store").explain()

== Physical Plan ==
*(2) Sort [store#139 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(store#139 ASC NULLS FIRST, 200)
   +- *(1) FileScan csv [date#137,time#138,store#139,item#140,cost#141,payment#142] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/asapehrsson/dev/learn/intro_to_hadoop_and_mapreduce/data/purchases_..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<date:date,time:string,store:string,item:string,cost:float,payment:string>


### Calculate total sales per store. Alt 2. Orter by total sales.

In [15]:
from pyspark.sql.functions import desc

ds.groupBy("store") \
    .sum("cost") \
    .withColumnRenamed("sum(cost)", "sum_total") \
    .sort(desc("sum_total")) \
    .limit(5) \
    .collect()


[Row(store='Chula Vista', sum_total=1118.650001525879),
 Row(store='Portland', sum_total=1037.1000061035156),
 Row(store='Gilbert', sum_total=969.4100036621094),
 Row(store='Norfolk', sum_total=894.0899963378906),
 Row(store='Corpus Christi', sum_total=879.3999938964844)]

In [16]:
from pyspark.sql.functions import dayofweek

ds.groupBy(dayofweek("date").alias("dayofweek")) \
    .sum("cost") \
    .withColumnRenamed("sum(cost)", "sum_total") \
    .sort(desc("sum_total")) \
    .collect()

[Row(dayofweek=2, sum_total=24596.899976730347)]