# Question formulation Notebook

Use of toy dataset and notebook dependencies.

### Notebook Set-up:

In [2]:
import re
import ast
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from os import path

In [3]:
PWD = !pwd
PWD = PWD[0]

In [156]:
from pyspark.sql import SparkSession
app_name = "w261-FinalProject"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()
sc = spark.sparkContext

In [5]:
# read the RDDs to see the form:
trainRDD = sc.textFile("gs://w261_final-project_team13/train.txt")
testRDD = sc.textFile("gs://w261_final-project_team13/test.txt")

In [18]:
trainRDD.take(2)

['0\t1\t1\t5\t0\t1382\t4\t15\t2\t181\t1\t2\t\t2\t68fd1e64\t80e26c9b\tfb936136\t7b4723c4\t25c83c98\t7e0ccccf\tde7995b8\t1f89b562\ta73ee510\ta8cd5504\tb2cb9c98\t37c9c164\t2824a5f6\t1adce6ef\t8ba8b39a\t891b62e7\te5ba7672\tf54016b9\t21ddcdc9\tb1252a9d\t07b5194c\t\t3a171ecb\tc5c50484\te8b83407\t9727dd16',
 '0\t2\t0\t44\t1\t102\t8\t2\t2\t4\t1\t1\t\t4\t68fd1e64\tf0cf0024\t6f67f7e5\t41274cd7\t25c83c98\tfe6b92e5\t922afcc0\t0b153874\ta73ee510\t2b53e5fb\t4f1b46f3\t623049e6\td7020589\tb28479f6\te6c5b5cd\tc92f3b61\t07c540c4\tb04e4670\t21ddcdc9\t5840adea\t60f6221e\t\t3a171ecb\t43f13e8b\te8b83407\t731c3655']

In [19]:
testRDD.take(2)

['\t29\t50\t5\t7260\t437\t1\t4\t14\t\t1\t0\t6\t5a9ed9b0\ta0e12995\ta1e14474\t08a40877\t25c83c98\t\t964d1fdd\t5b392875\ta73ee510\tde89c3d2\t59cd5ae7\t8d98db20\t8b216f7b\t1adce6ef\t78c64a1d\t3ecdadf7\t3486227d\t1616f155\t21ddcdc9\t5840adea\t2c277e62\t\t423fab69\t54c91918\t9b3e8820\te75c9ae9',
 '27\t17\t45\t28\t2\t28\t27\t29\t28\t1\t1\t\t23\t68fd1e64\t960c983b\t9fbfbfd5\t38c11726\t25c83c98\t7e0ccccf\tfe06fd10\t062b5529\ta73ee510\tca53fc84\t67360210\t895d8bbb\t4f8e2224\tf862f261\tb4cc2435\t4c0041e5\te5ba7672\tb4abdd09\t21ddcdc9\t5840adea\t36a7ab86\t\t32c7478e\t85e4d73f\t010f6491\tee63dd9b']

We see that both are tab-separated files, so we want to sample them into a single-node computation friendly file and get back to the local machines. For that we need to know how many observations we have:

In [20]:
print('Train dataset count:', trainRDD.count(), 'observations.')
print('Test dataset count:', testRDD.count(), 'observations.')

Train dataset count: 45840617 observations.
Test dataset count: 6042135 observations.


Based on that we will take 0.3% of the dataset as sample, which will be roughly $45.840.617 \cdot 0.0003 = 137.521$ observations and $10.38E3 \cdot 0.0003 = 30$ MB, perfectly handle in a single node machine and still relevant. For the test dataset the same smaple ratio will be kept.

Another point is that the text file do not have headers, and as we want to work with dataframes, we want to create a schema. To do that we may take a look at the `readme.txt` file supplied with the data:

```
====================================================

Format:

The columns are tab separeted with the following schema:
<label> <integer feature 1> ... <integer feature 13> <categorical feature 1> ... <categorical feature 26>

When a value is missing, the field is just empty.
There is no label field in the test set.

====================================================
```

Additionally we need to parse the data, going from lines of strings to integers and and strings. For that we can map the RDD after sampling and converting to the desired type:

In [6]:
labelsTrain = ['label','I1','I2','I3','I4','I5','I6','I7','I8','I9','I10','I11','I12','I13',
               'C1','C2','C3','C4','C5','C6','C7','C8','C9','C10','C11','C12','C13','C14',
               'C15','C16','C17','C18','C19','C20','C21','C22','C23','C24','C25','C26']

labelsTest = ['I1','I2','I3','I4','I5','I6','I7','I8','I9','I10','I11','I12','I13',
              'C1','C2','C3','C4','C5','C6','C7','C8','C9','C10','C11','C12','C13','C14',
              'C15','C16','C17','C18','C19','C20','C21','C22','C23','C24','C25','C26']

toyTrainDF = trainRDD.sample(False, 0.003, 2019).map(lambda line: line.split('\t')).toDF(labelsTrain)
toyTestDF = testRDD.sample(False, 0.003, 2019).map(lambda line: line.split('\t')).toDF(labelsTest)

In [42]:
# verifying the count:
print('Toy train dataframe count:', toyTrainDF.count())
print('Toy test dataframe count:', toyTestDF.count())

Toy train dataframe count: 137139
Toy test dataframe count: 18181


In [44]:
# Now writing out toy datasets to be able to work on local machines
toyTrainDF.write.parquet("gs://w261_final-project_team13/toy_train.txt")
toyTestDF.write.parquet("gs://w261_final-project_team13/toy_test.txt")

Py4JJavaError: An error occurred while calling o219.parquet.
: java.io.IOException: No FileSystem for scheme: gs
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:452)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:548)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:278)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:547)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


### Now running on the local machine:

In [45]:
# copy the files to the local machine:
!gsutil -m cp gs://w261_final-project_team13/toy_test.txt/* .data/toy_test.txt/
!gsutil -m cp gs://w261_final-project_team13/toy_train.txt/* .data/toy_train.txt/
gsutil cp gs://w261_final-project_team13/notebooks/* ./QuestionFormulation.ipynb

/media/notebooks/Assignments/FinalProject/W261-SP19-Team13-FinalProject


In [188]:
# read the parquet files and print the first observations of each:
toyTrainDF = spark.read.parquet("./data/toy_train.txt")
toyTestDF = spark.read.parquet("./data/toy_test.txt")

In [189]:
toyTrainDF.head()

Row(label='0', I1='1', I2='2', I3='17', I4='3', I5='685', I6='16', I7='2', I8='3', I9='7', I10='1', I11='2', I12='1', I13='3', C1='05db9164', C2='38a947a1', C3='78c2dcf9', C4='041c8b35', C5='4cf72387', C6='6f6d9be8', C7='94aa68fb', C8='1f89b562', C9='a73ee510', C10='ac25feb9', C11='577aa337', C12='5b91fbfa', C13='f405e2e8', C14='07d13a8f', C15='a8041309', C16='15913bcf', C17='3486227d', C18='998b9a30', C19='', C20='', C21='3e2fae11', C22='', C23='32c7478e', C24='09a589c1', C25='', C26='')

In [190]:
toyTestDF.head()

Row(I1='', I2='-1', I3='', I4='', I5='8124', I6='62', I7='5', I8='1', I9='37', I10='', I11='1', I12='', I13='', C1='05db9164', C2='08c2f5df', C3='dde182a0', C4='de1dc0c1', C5='25c83c98', C6='fbad5c96', C7='ad3508b1', C8='0b153874', C9='a73ee510', C10='965e1030', C11='ad757a5a', C12='7a27d4e1', C13='93b18cb5', C14='1adce6ef', C15='a43baafd', C16='84534f54', C17='e5ba7672', C18='29b0e3e5', C19='', C20='', C21='2b81e06c', C22='c9d4222a', C23='423fab69', C24='2f647dfe', C25='', C26='')

We see that all features are strings. We want to cast the `label` feature to _Boolean_ and the `I1` to `I13` features to _integers_:

In [191]:
toyTrainDF = toyTrainDF.withColumn('label', toyTrainDF.label.cast('Boolean'))

intColumns = ['I1','I2','I3','I4','I5','I6','I7','I8','I9','I10','I11','I12','I13']
for col in intColumns:
    toyTrainDF = toyTrainDF.withColumn(col, toyTrainDF[col].cast('Integer'))
    toyTestDF = toyTestDF.withColumn(col, toyTestDF[col].cast('Integer'))
    
strColumns = ['C1','C2','C3','C4','C5','C6','C7','C8','C9','C10','C11','C12','C13','C14',
              'C15','C16','C17','C18','C19','C20','C21','C22','C23','C24','C25','C26']
for column in strColumns:
    toyTrainDF = toyTrainDF.withColumn(column, when(toyTrainDF[column] != "", toyTrainDF[column]).otherwise(None))
    toyTestDF = toyTestDF.withColumn(column, when(toyTestDF[column] != "", toyTestDF[column]).otherwise(None))

In [192]:
toyTrainDF.head()

Row(label=False, I1=1, I2=2, I3=17, I4=3, I5=685, I6=16, I7=2, I8=3, I9=7, I10=1, I11=2, I12=1, I13=3, C1='05db9164', C2='38a947a1', C3='78c2dcf9', C4='041c8b35', C5='4cf72387', C6='6f6d9be8', C7='94aa68fb', C8='1f89b562', C9='a73ee510', C10='ac25feb9', C11='577aa337', C12='5b91fbfa', C13='f405e2e8', C14='07d13a8f', C15='a8041309', C16='15913bcf', C17='3486227d', C18='998b9a30', C19=None, C20=None, C21='3e2fae11', C22=None, C23='32c7478e', C24='09a589c1', C25=None, C26=None)

In [193]:
toyTestDF.head()

Row(I1=None, I2=-1, I3=None, I4=None, I5=8124, I6=62, I7=5, I8=1, I9=37, I10=None, I11=1, I12=None, I13=None, C1='05db9164', C2='08c2f5df', C3='dde182a0', C4='de1dc0c1', C5='25c83c98', C6='fbad5c96', C7='ad3508b1', C8='0b153874', C9='a73ee510', C10='965e1030', C11='ad757a5a', C12='7a27d4e1', C13='93b18cb5', C14='1adce6ef', C15='a43baafd', C16='84534f54', C17='e5ba7672', C18='29b0e3e5', C19=None, C20=None, C21='2b81e06c', C22='c9d4222a', C23='423fab69', C24='2f647dfe', C25=None, C26=None)

Now, we can start analyzing the data to understand which features are more likely to contribute to a model and which are more likely to bias our model (due to a high number of `None` values or that need normalization, for example.