![image](http://static1.squarespace.com/static/570c9f9986db43b6bdd1bc72/t/570d19280442628cdb632273/1494519196089)

This is a Jupyter notebook created by Joseph Kambourakis as part of the application process for Authess.  It was created using Data Science Experience and may have some specific code relating to the specific platform.  
1.  Load Apache Spark libraries
2.  Download data from S3 and create a dataframe
3.  Parse the file
4.  Perform test query
5.  Write Apache Parquet file back to S3


# Step One: Load Libraries & Initialize Spark Session


In [1]:
from pyspark.sql import SparkSession
spark = SparkSession(sc)

# Step Two: Load data file from S3
Here, we are reading all the files in the directory.  This can be adjusted to read all the .log files, all the .txt files, or directories based on the current date.  

We have to pass Amazon S3 authentication keys in Data Science Experience in order to acess S3. The data file can be found here:  https://s3-us-west-2.amazonaws.com/authesskambourakis/sample+source+data.txt 

In [2]:
path_to_input = 's3a://authesskambourakis/*' #This is the directory I created in AWS with the sample file

In [3]:
hconf = sc._jsc.hadoopConfiguration()  
hconf.set("fs.s3a.access.key", "AKIAIOLNY5BNHTFDUUWQ")  
hconf.set("fs.s3a.secret.key", "r2bJPOTRp5lwLUtaoCiAxCCgV+tSFzwsZ5RHTjo7")
events = spark.read.json('s3a://authesskambourakis/*')

In [4]:
events.count()
type(events)

pyspark.sql.dataframe.DataFrame

An alternative, but less scalable method is to download the file off the web using bash commands and then read it locally.  This version only reads one file.

In [None]:
!wget https://s3-us-west-2.amazonaws.com/authesskambourakis/sample+source+data.txt
!ls

In [None]:
events = spark.read.json('sample+source+data.txt')

# Step Three: Examine the Schema and Data
From this step, one can see that log_events is a nested field.  

In [5]:
print events.printSchema()

root
 |-- assessment_id: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- log_events: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- altKey: string (nullable = true)
 |    |    |-- bubbles: string (nullable = true)
 |    |    |-- button: string (nullable = true)
 |    |    |-- buttons: string (nullable = true)
 |    |    |-- cancelable: string (nullable = true)
 |    |    |-- clientX: string (nullable = true)
 |    |    |-- clientY: string (nullable = true)
 |    |    |-- ctrlKey: string (nullable = true)
 |    |    |-- currentTarget: string (nullable = true)
 |    |    |-- detail: string (nullable = true)
 |    |    |-- eventPhase: string (nullable = true)
 |    |    |-- event_type: string (nullable = true)
 |    |    |-- felt_id: string (nullable = true)
 |    |    |-- metaKey: string (nullable = true)
 |    |    |-- offsetX: string (nullable = true)
 |    |    |-- offsetY: string (nullab

In [6]:
events.toPandas()

Unnamed: 0,assessment_id,created_at,id,log_events,user_id
0,2,7/14/2016 2:13:53 AM,9290,"[(false, true, 0, 0, true, 912, 353, false, BO...",26
1,2,7/14/2016 2:13:55 AM,9291,"[(false, true, 0, 0, true, 912, 353, true, BOD...",26
2,23,7/14/2016 2:14:05 AM,9292,"[(temp alt text, true, 0, 0, true, 925, 360, f...",22


# Step Four: Create a Temproray View to Subset the Nested Portion
![image](http://content.edupristine.com.s3.amazonaws.com/images/blogs/SparkSQL_JSONimg0.jpg)
Here we use SparkSQL to register a temporary view of the table so we can then use the [explode](https://spark.apache.org/docs/2.0.1/api/java/org/apache/spark/sql/functions.html#explode(org.apache.spark.sql.Column) function to expand the nested schema.

In [7]:
events.registerTempTable('events')

event1 = spark.sql('select assessment_id, created_at, id, explode(log_events) as log_events, user_id from events')
event1.toPandas()

Unnamed: 0,assessment_id,created_at,id,log_events,user_id
0,2,7/14/2016 2:13:53 AM,9290,"(false, true, 0, 0, true, 912, 353, false, BOD...",26
1,2,7/14/2016 2:13:53 AM,9290,"(false, true, 0, 0, true, 912, 353, false, BOD...",26
2,2,7/14/2016 2:13:53 AM,9290,"(false, true, 0, 0, true, 912, 353, false, BOD...",26
3,2,7/14/2016 2:13:53 AM,9290,"(false, true, 0, 0, true, 927, 341, false, BOD...",26
4,2,7/14/2016 2:13:53 AM,9290,"(false, true, 0, 0, true, 936, 335, false, BOD...",26
5,2,7/14/2016 2:13:53 AM,9290,"(false, true, 0, 0, true, 886, 443, false, TEX...",26
6,2,7/14/2016 2:13:55 AM,9291,"(false, true, 0, 0, true, 912, 353, true, BODY...",26
7,2,7/14/2016 2:13:55 AM,9291,"(false, true, 0, 0, true, 912, 353, false, BOD...",26
8,2,7/14/2016 2:13:55 AM,9291,"(false, true, 0, 0, true, 912, 353, false, BOD...",26
9,2,7/14/2016 2:13:55 AM,9291,"(false, true, 0, 0, true, 927, 341, false, BOD...",26


This gives us one row per log event, but the log events are still stored as an array as we saw earlier in the printed schema.  The following command will turn it into a Struct object that removes the nesting.  

In [8]:
event2 = event1.rdd.map(lambda x: x[3]).toDF()
event2.toPandas()

Unnamed: 0,altKey,bubbles,button,buttons,cancelable,clientX,clientY,ctrlKey,currentTarget,detail,...,pageX,pageY,relatedTarget,screenX,screenY,shiftKey,target,timeStamp,toElement,which
0,false,True,0,0,True,912,353,False,BODY,0,...,912,353,,913,436,False,BODY,1468524065236,BODY,0
1,false,True,0,0,True,912,353,False,BODY,0,...,912,353,,913,436,False,BODY,1468524065238,BODY,0
2,false,True,0,0,True,912,353,False,BODY,0,...,912,353,,913,436,False,BODY,1468524065252,BODY,0
3,false,True,0,0,True,927,341,False,BODY,0,...,927,341,,928,424,False,BODY,1468524065255,BODY,0
4,false,True,0,0,True,936,335,False,BODY,0,...,936,335,,937,418,False,BODY,1468524065257,BODY,0
5,false,True,0,0,True,886,443,False,TEXTAREA,0,...,886,443,BUTTON,976,571,False,TEXTAREA,1470137027138,TEXTAREA,0
6,false,True,0,0,True,912,353,True,BODY,0,...,912,353,,913,436,False,BODY,1468524065237,BODY,0
7,false,True,0,0,True,912,353,False,BODY,0,...,912,353,,913,436,False,BODY,1468524065237,BODY,0
8,false,True,0,0,True,912,353,False,BODY,0,...,912,353,,913,436,False,BODY,1468524065252,BODY,0
9,false,True,0,0,True,927,341,False,BODY,0,...,927,341,,928,424,False,BODY,1468524065255,BODY,0


These are the four columns that we lost by converting the log_events array into a struct.  

In [9]:
event1 = event1.select('assessment_id', 'created_at', 'id', 'user_id')
event1.toPandas()

Unnamed: 0,assessment_id,created_at,id,user_id
0,2,7/14/2016 2:13:53 AM,9290,26
1,2,7/14/2016 2:13:53 AM,9290,26
2,2,7/14/2016 2:13:53 AM,9290,26
3,2,7/14/2016 2:13:53 AM,9290,26
4,2,7/14/2016 2:13:53 AM,9290,26
5,2,7/14/2016 2:13:53 AM,9290,26
6,2,7/14/2016 2:13:55 AM,9291,26
7,2,7/14/2016 2:13:55 AM,9291,26
8,2,7/14/2016 2:13:55 AM,9291,26
9,2,7/14/2016 2:13:55 AM,9291,26


We'll need to rejoin the expanded log_events data frame with the four column dataframe we lost.  We can simply do this by adding an index to each and doing a join on the index.  We'll then drop the index.  

In [10]:
from pyspark.sql.functions import monotonically_increasing_id 

event1withindex = event1.select("*").withColumn("index", monotonically_increasing_id())
event1withindex.toPandas()

Unnamed: 0,assessment_id,created_at,id,user_id,index
0,2,7/14/2016 2:13:53 AM,9290,26,0
1,2,7/14/2016 2:13:53 AM,9290,26,1
2,2,7/14/2016 2:13:53 AM,9290,26,2
3,2,7/14/2016 2:13:53 AM,9290,26,3
4,2,7/14/2016 2:13:53 AM,9290,26,4
5,2,7/14/2016 2:13:53 AM,9290,26,5
6,2,7/14/2016 2:13:55 AM,9291,26,6
7,2,7/14/2016 2:13:55 AM,9291,26,7
8,2,7/14/2016 2:13:55 AM,9291,26,8
9,2,7/14/2016 2:13:55 AM,9291,26,9


In [11]:
event2withindex = event2.select("*").withColumn("index", monotonically_increasing_id())
event2withindex.toPandas()

Unnamed: 0,altKey,bubbles,button,buttons,cancelable,clientX,clientY,ctrlKey,currentTarget,detail,...,pageY,relatedTarget,screenX,screenY,shiftKey,target,timeStamp,toElement,which,index
0,false,True,0,0,True,912,353,False,BODY,0,...,353,,913,436,False,BODY,1468524065236,BODY,0,0
1,false,True,0,0,True,912,353,False,BODY,0,...,353,,913,436,False,BODY,1468524065238,BODY,0,1
2,false,True,0,0,True,912,353,False,BODY,0,...,353,,913,436,False,BODY,1468524065252,BODY,0,2
3,false,True,0,0,True,927,341,False,BODY,0,...,341,,928,424,False,BODY,1468524065255,BODY,0,3
4,false,True,0,0,True,936,335,False,BODY,0,...,335,,937,418,False,BODY,1468524065257,BODY,0,4
5,false,True,0,0,True,886,443,False,TEXTAREA,0,...,443,BUTTON,976,571,False,TEXTAREA,1470137027138,TEXTAREA,0,5
6,false,True,0,0,True,912,353,True,BODY,0,...,353,,913,436,False,BODY,1468524065237,BODY,0,6
7,false,True,0,0,True,912,353,False,BODY,0,...,353,,913,436,False,BODY,1468524065237,BODY,0,7
8,false,True,0,0,True,912,353,False,BODY,0,...,353,,913,436,False,BODY,1468524065252,BODY,0,8
9,false,True,0,0,True,927,341,False,BODY,0,...,341,,928,424,False,BODY,1468524065255,BODY,0,9


In [12]:
joined = event1withindex.join(event2withindex, event1withindex['index']==event2withindex['index'])
joined=joined.drop('index')
joined.toPandas()

Unnamed: 0,assessment_id,created_at,id,user_id,altKey,bubbles,button,buttons,cancelable,clientX,...,pageX,pageY,relatedTarget,screenX,screenY,shiftKey,target,timeStamp,toElement,which
0,2,7/14/2016 2:13:53 AM,9290,26,false,True,0,0,True,912,...,912,353,,913,436,False,BODY,1468524065236,BODY,0
1,2,7/14/2016 2:13:53 AM,9290,26,false,True,0,0,True,912,...,912,353,,913,436,False,BODY,1468524065238,BODY,0
2,2,7/14/2016 2:13:53 AM,9290,26,false,True,0,0,True,912,...,912,353,,913,436,False,BODY,1468524065252,BODY,0
3,2,7/14/2016 2:13:53 AM,9290,26,false,True,0,0,True,927,...,927,341,,928,424,False,BODY,1468524065255,BODY,0
4,2,7/14/2016 2:13:53 AM,9290,26,false,True,0,0,True,936,...,936,335,,937,418,False,BODY,1468524065257,BODY,0
5,2,7/14/2016 2:13:53 AM,9290,26,false,True,0,0,True,886,...,886,443,BUTTON,976,571,False,TEXTAREA,1470137027138,TEXTAREA,0
6,2,7/14/2016 2:13:55 AM,9291,26,false,True,0,0,True,912,...,912,353,,913,436,False,BODY,1468524065237,BODY,0
7,2,7/14/2016 2:13:55 AM,9291,26,false,True,0,0,True,912,...,912,353,,913,436,False,BODY,1468524065237,BODY,0
8,2,7/14/2016 2:13:55 AM,9291,26,false,True,0,0,True,912,...,912,353,,913,436,False,BODY,1468524065252,BODY,0
9,2,7/14/2016 2:13:55 AM,9291,26,false,True,0,0,True,927,...,927,341,,928,424,False,BODY,1468524065255,BODY,0


In [13]:
joined.printSchema()

root
 |-- assessment_id: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- altKey: string (nullable = true)
 |-- bubbles: string (nullable = true)
 |-- button: string (nullable = true)
 |-- buttons: string (nullable = true)
 |-- cancelable: string (nullable = true)
 |-- clientX: string (nullable = true)
 |-- clientY: string (nullable = true)
 |-- ctrlKey: string (nullable = true)
 |-- currentTarget: string (nullable = true)
 |-- detail: string (nullable = true)
 |-- eventPhase: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- felt_id: string (nullable = true)
 |-- metaKey: string (nullable = true)
 |-- offsetX: string (nullable = true)
 |-- offsetY: string (nullable = true)
 |-- pageX: string (nullable = true)
 |-- pageY: string (nullable = true)
 |-- relatedTarget: string (nullable = true)
 |-- screenX: string (nullable = true)
 |-- screenY: string (nullable = true)
 |

# Step Five: Perform Sample Query
We'll perform the simple query in SparkSQL, which should give us a similar behavior to HiveQL.  Again, we'll need to register the table as a temporary view before performing the query.  

In [14]:
joined.registerTempTable('logs_as_columns')


In [15]:
spark.sql("select * from logs_as_columns WHERE id='9290'").toPandas()

Unnamed: 0,assessment_id,created_at,id,user_id,altKey,bubbles,button,buttons,cancelable,clientX,...,pageX,pageY,relatedTarget,screenX,screenY,shiftKey,target,timeStamp,toElement,which
0,2,7/14/2016 2:13:53 AM,9290,26,False,True,0,0,True,912,...,912,353,,913,436,False,BODY,1468524065236,BODY,0
1,2,7/14/2016 2:13:53 AM,9290,26,False,True,0,0,True,912,...,912,353,,913,436,False,BODY,1468524065238,BODY,0
2,2,7/14/2016 2:13:53 AM,9290,26,False,True,0,0,True,912,...,912,353,,913,436,False,BODY,1468524065252,BODY,0
3,2,7/14/2016 2:13:53 AM,9290,26,False,True,0,0,True,927,...,927,341,,928,424,False,BODY,1468524065255,BODY,0
4,2,7/14/2016 2:13:53 AM,9290,26,False,True,0,0,True,936,...,936,335,,937,418,False,BODY,1468524065257,BODY,0
5,2,7/14/2016 2:13:53 AM,9290,26,False,True,0,0,True,886,...,886,443,BUTTON,976,571,False,TEXTAREA,1470137027138,TEXTAREA,0


# Step Six: Write to Parquet
The first command writes it to the local file system.  The second writes the file to AWS.  

In [None]:
joined.write.parquet("joined.parquet")


In [16]:
joined.write.parquet('s3a://authesskambourakis/joined.parquet')

![image](https://github.com/JosephKambourakisIBM/authess/blob/master/Screen%20Shot%202017-05-15%20at%207.11.57%20PM.png?raw=true)

# Conclusion 
Overall, I did meet the requirement and enjoyed the exercise.  I didn't have much experience with Amazon Web Services, so this gave me a great opportunity to learn.  My solution could be improved.  I ran into some trouble trying to use explode on the log_events column and couldn't because it was an array and not a struct.  Changing the schema to reflect this change will avoid the need for that subsetting, indexing, and joining. I don't think my solution would work at scale because of the way indexing works on distributed files.  Please email me with any comments or thoughts at joseph.kambourakis@gmail.com or joseph.kambourakis@ibm.com.

thanks!
Joe