###Lab 2 Semi-structured Data

This notebook to perform data wrangling on Clickstream data 

Leverages Spark Data Frames Library

Clickstream data from https://2015.recsyschallenge.com/

### Step 1: Set the data location and type

There are two ways to access Azure Blob storage: account keys and shared access signatures (SAS).

To get started, we need to set the location and type of the file.


https://docs.microsoft.com/en-us/azure/azure-databricks/databricks-extract-load-sql-data-warehouse

In [3]:
storage_account_name = "<Storage Account Name Here>"
storage_account_access_key = "<Storage Account Key Here>"

In [4]:
file_location = "wasbs://<storage_account_name>.blob.core.windows.net/yoochoose-clicks.dat"
file_type = "csv"

In [5]:
spark.conf.set(
  "fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
  storage_account_access_key)

In [6]:
display(dbutils.fs.ls("wasbs://<container_name>@<storage_account_name>.blob.core.windows.net/"))

### Step 2: Read the data

Now that we have specified our file metadata, we can create a DataFrame. Notice that we use an *option* to specify that we want to infer the schema from the file. We can also explicitly set this to a particular schema if we have one already.

####Best Python Libraries for Data Transformations

Python (which libraries -- DF or Pandas)

petl

pandas

SQLAlchemy

Bonobo

https://github.com/pawl/awesome-etl#python

https://docs.databricks.com/spark/latest/spark-sql/spark-pandas.html (Arrow to convert Spark to PD)

####Transformation links w/ Spark DataFrames

https://mapr.com/blog/using-apache-spark-dataframes-processing-tabular-data/

https://mapr.com/products/apache-spark/

https://docs.azuredatabricks.net/_static/notebooks/transform-complex-data-types-python.html 

https://thepythonguru.com/python-how-to-read-and-write-csv-files/

https://docs.azuredatabricks.net/data/data-sources/read-csv.html

In [9]:
df = spark.read.format(file_type).option("inferSchema", "true").load(file_location)
df = df.withColumnRenamed("_c0", "SessionID")
df = df.withColumnRenamed("_c1", "Timestamp")
df = df.withColumnRenamed("_c2", "ProductID")
df = df.withColumnRenamed("_c3", "Category")
df.printSchema()

In [10]:
display(df.summary())

### Step 3: Query the data

Now that we have created our DataFrame, we can query it. For instance, you can identify particular columns to select and display.

#### Links to Query data
https://spark.apache.org/docs/1.6.0/sql-programming-guide.html

https://hackersandslackers.com/transforming-pyspark-dataframes/

In [12]:
display(df.select("SessionID","Timestamp","ProductID","Category"))

### Step 4: (Optional) Create a view or table

If you want to query this data as a table, you can simply register it as a *view* or a table.

In [14]:
df.createOrReplaceTempView("Clicks")

We can query this view using Spark SQL. For instance, we can perform a simple aggregation. Notice how we can use `%sql` to query the view from SQL.

In [16]:
%sql

SELECT * FROM Clicks

Since this table is registered as a temp view, it will be available only to this notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.

In [18]:
df.write.format("parquet").saveAsTable("Clicks")

In [19]:
%sql

SELECT 
SessionID,
min(Timestamp) as FirstTimestamp, 
max(Timestamp) as LastTimestamp,
unix_timestamp(max(Timestamp)) - unix_timestamp(min(Timestamp)) as Duration,
max(Category) as LastCategory,
min(ProductID) as FirstProduct, 
max(ProductID) as LastProduct,
COUNT(*) as SessionCount 
FROM Clicks 
group by SessionID 
order by SessionID

In [20]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

dfsum=sqlContext.sql("SELECT SessionID,min(Timestamp) as FirstTimestamp, max(Timestamp) as LastTimestamp,unix_timestamp(max(Timestamp)) - unix_timestamp(min(Timestamp)) as Duration,max(Category) as LastCategory,min(ProductID) as FirstProduct, max(ProductID) as LastProduct, Count(*) as SessionCount FROM Clicks group by SessionID order by SessionID")
dfsum.show()

In [21]:
dfsum.write.partitionBy("LastCategory").format("parquet").saveAsTable("Clicks_Summary")

This table will persist across cluster restarts and allow various users across different notebooks to query this data.

https://docs.azuredatabricks.net/data/databricks-file-system.html

In [23]:
%python
display(dbutils.fs.ls("/user/hive/warehouse/clicks_summary/"))