## KKChurn model 
### Team members: Lorena Mejía, Alfredo Carrillo, Rodrigo Diseñador and Ricardo Figueroa

In [1]:
import numpy as np
import csv
import sys
import pandas as pd
from pyspark import SparkContext, SparkConf
import os
import re
from pyspark.sql import SparkSession

In [2]:
# We create the spark application called 'KKchurn' to use all the available cores
conf = SparkConf().setAppName('KKChurn').setMaster("local[*]").set("spark.driver.maxResultSize", "6g")
sc=SparkContext(conf=conf)
spark = SparkSession(sc)

In [3]:
# Import transactions file using Pyspark
transactions=sc.textFile('./data/transactions.csv') \
    .map(lambda line: line.split(","))
transactions=transactions.toDF(transactions.first())
transactions = transactions.rdd.zipWithIndex().filter(lambda row_index: row_index[1] > 0).keys().toDF()

In [59]:
# Import transactions_v2 file using Pyspark
transactions_2=sc.textFile('./data/churn_comp_refresh/transactions_v2.csv') \
    .map(lambda line: line.split(","))
transactions_2=transactions_2.toDF(transactions_2.first())
transactions_2 = transactions_2.rdd.zipWithIndex().filter(lambda row_index: row_index[1] > 0).keys().toDF()

In [5]:
# Import train data (before updates)
train_data=sc.textFile('./data/train.csv') \
    .map(lambda line: line.split(","))
train_data=train_data.toDF(train_data.first())
train_data = train_data.rdd.zipWithIndex().filter(lambda row_index: row_index[1] > 0).keys().toDF()

In [6]:
# Import train data v2 
train_data_2=sc.textFile('./data/churn_comp_refresh/train_v2.csv') \
    .map(lambda line: line.split(","))
train_data_2=train_data_2.toDF(train_data_2.first())
train_data_2 = train_data_2.rdd.zipWithIndex().filter(lambda row_index: row_index[1] > 0).keys().toDF()

In [7]:
sample_submission_zero = sc.textFile('./data/sample_submission_zero.csv') \
    .map(lambda line: line.split(","))
sample_submission_zero=sample_submission_zero.toDF(sample_submission_zero.first())
sample_submission_zero = sample_submission_zero.rdd.zipWithIndex().filter(lambda row_index: row_index[1] > 0).keys().toDF()

In [8]:
sample_submission_2 = sc.textFile('./data/churn_comp_refresh/sample_submission_v2.csv') \
    .map(lambda line: line.split(","))
sample_submission_2=sample_submission_2.toDF(sample_submission_2.first())
sample_submission_2 = sample_submission_2.rdd.zipWithIndex().filter(lambda row_index: row_index[1] > 0).keys().toDF()

In [9]:
user_logs = sc.textFile('./data/user_logs.csv') \
    .map(lambda line: line.split(","))
user_logs=user_logs.toDF(user_logs.first())
user_logs = user_logs.rdd.zipWithIndex().filter(lambda row_index: row_index[1] > 0).keys().toDF()

In [10]:
# Example to show a dataframe from Spark
user_logs.show()

+--------------------+--------+------+------+------+-------+-------+-------+----------+
|                msno|    date|num_25|num_50|num_75|num_985|num_100|num_unq|total_secs|
+--------------------+--------+------+------+------+-------+-------+-------+----------+
|rxIP2f2aN0rYNp+to...|20150513|     0|     0|     0|      0|      1|      1|  280.3350|
|rxIP2f2aN0rYNp+to...|20150709|     9|     1|     0|      0|      7|     11| 1658.9480|
|yxiEWwE9VR5utpUec...|20150105|     3|     3|     0|      0|     68|     36|17364.9560|
|yxiEWwE9VR5utpUec...|20150306|     1|     0|     1|      1|     97|     27|24667.3170|
|yxiEWwE9VR5utpUec...|20150501|     3|     0|     0|      0|     38|     38| 9649.0290|
|yxiEWwE9VR5utpUec...|20150702|     4|     0|     1|      1|     33|     10|10021.5200|
|yxiEWwE9VR5utpUec...|20150830|     3|     1|     0|      0|      4|      7| 1119.5550|
|yxiEWwE9VR5utpUec...|20151107|     1|     0|     0|      0|      4|      5|  938.0220|
|yxiEWwE9VR5utpUec...|20160110| 

In [11]:
user_logs_2 = sc.textFile('./data/user_logs.csv') \
    .map(lambda line: line.split(","))
user_logs_2=user_logs_2.toDF(user_logs_2.first())
user_logs_2 = user_logs_2.rdd.zipWithIndex().filter(lambda row_index: row_index[1] > 0).keys().toDF()

In [12]:
members = sc.textFile('./data/members_v3.csv') \
    .map(lambda line: line.split(","))
members=members.toDF(members.first())
members = members.rdd.zipWithIndex().filter(lambda row_index: row_index[1] > 0).keys().toDF()

In [13]:
# Example of a query using pyspark
aggr_value = transactions.select("msno").show()

+--------------------+
|                msno|
+--------------------+
|YyO+tlZtAXYXoZhNr...|
|AZtu6Wl0gPojrEQYB...|
|UkDFI97Qb6+s2LWci...|
|M1C56ijxozNaGD0t2...|
|yvj6zyBUaqdbUQSrK...|
|KN7I82kjY0Tn76Ny9...|
|m5ptKif9BjdUghHXX...|
|uQxbyACsPOEkTIrv9...|
|LUPRfoE2r3WwVWhYO...|
|pMVjPLgVknaJYm9L0...|
|bQkbrEPdMfVfdsoz0...|
|TZVCT9pCufI/AWjrG...|
|b2AiGMFhT6fbDyN12...|
|ksInNb4D5jdSSIYUr...|
|aQKXNflQtXF92cpv4...|
|iFxPpElVK6kXnZbuh...|
|8qrtRZQTuCih4YJhj...|
|pE2FeJOBZv5snDGdF...|
|vma4rQzDa/l4Wb/My...|
|Qw6UVFUknPVOLxSSs...|
+--------------------+
only showing top 20 rows



In [64]:
transactions.show()

+--------------------+-----------------+-----------------+---------------+------------------+-------------+----------------+----------------------+---------+
|                msno|payment_method_id|payment_plan_days|plan_list_price|actual_amount_paid|is_auto_renew|transaction_date|membership_expire_date|is_cancel|
+--------------------+-----------------+-----------------+---------------+------------------+-------------+----------------+----------------------+---------+
|YyO+tlZtAXYXoZhNr...|               41|               30|            129|               129|            1|        20150930|              20151101|        0|
|AZtu6Wl0gPojrEQYB...|               41|               30|            149|               149|            1|        20150930|              20151031|        0|
|UkDFI97Qb6+s2LWci...|               41|               30|            129|               129|            1|        20150930|              20160427|        0|
|M1C56ijxozNaGD0t2...|               39|            

### Train y Test Data
The train and test set, containing the user ids and whether they have churned. 
- msno: user id
- is_churn: This is the target variable. Churn is defined as whether the user did not continue the subscription within 30 days of expiration. is_churn = 1 means churn,is_churn = 0 means renewal.


### Transactions Data
The information and variables included in this file are the following:
- MSNO: id of the user
- payment_method_id: payment method
- payment_plan_days: length of membership plan in days
- plan_list_price: in New Taiwan Dollar (NTD)
- actual_amount_paid: in New Taiwan Dollar (NTD)
- is_auto_renew
- transaction_date: format %Y%m%d
- membership_expire_date: format %Y%m%d
- is_cancel: whether or not the user canceled the membership in this transaction.

### User logs
Daily user logs describing listening behaviors of a user. Data collected until 2/28/2017. 
The information and variables included in this file are the following:
- msno: user id
- date: format %Y%m%d
- num_25: # of songs played less than 25% of the song length
- num_50: # of songs played between 25% to 50% of the song length
- num_75: # of songs played between 50% to 75% of of the song length
- num_985: # of songs played between 75% to 98.5% of the song length
- num_100: # of songs played over 98.5% of the song length
- num_unq: # of unique songs played

### Members
user information. Note that not every user in the dataset is available.
- msno
- city
- bd: age. Note: this column has outlier values ranging from -7000 to 2015, please use your judgement.
- gender
- registered_via: registration method
- registration_init_time: format %Y%m%d
- expiration_date: format %Y%m%d, taken as a snapshot at which the member.csv is extracted. Not representing the actual churn behavior.

### Descripción de las variables de cada una de las tablas y de los tipos de variables

In [18]:
train_data.printSchema()

root
 |-- msno: string (nullable = true)
 |-- is_churn: string (nullable = true)



In [19]:
transactions.printSchema()

root
 |-- msno: string (nullable = true)
 |-- payment_method_id: string (nullable = true)
 |-- payment_plan_days: string (nullable = true)
 |-- plan_list_price: string (nullable = true)
 |-- actual_amount_paid: string (nullable = true)
 |-- is_auto_renew: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- membership_expire_date: string (nullable = true)
 |-- is_cancel: string (nullable = true)



In [20]:
sample_submission_zero.printSchema()

root
 |-- msno: string (nullable = true)
 |-- is_churn: string (nullable = true)



In [21]:
user_logs.printSchema()

root
 |-- msno: string (nullable = true)
 |-- date: string (nullable = true)
 |-- num_25: string (nullable = true)
 |-- num_50: string (nullable = true)
 |-- num_75: string (nullable = true)
 |-- num_985: string (nullable = true)
 |-- num_100: string (nullable = true)
 |-- num_unq: string (nullable = true)
 |-- total_secs: string (nullable = true)



In [22]:
# Guardar ciertos datos en una data frame
pd.DataFrame(train_data.take(5), columns=train_data.columns)

Unnamed: 0,msno,is_churn
0,waLDQMmcOu2jLDaV1ddDkgCrB/jl6sD66Xzs0Vqax1Y=,1
1,QA7uiXy8vIbUSPOkCf9RwQ3FsT8jVq2OxDr8zqa7bRQ=,1
2,fGwBva6hikQmTJzrbz/2Ezjm5Cth5jZUNvXigKK2AFA=,1
3,mT5V8rEpa+8wuqi6x0DoVd3H5icMKkE9Prt49UlmK+4=,1
4,XaPhtGLk/5UvvOYHcONTwsnH97P4eGECeq+BARGItRw=,1


In [23]:
df = user_logs.select('msno', user_logs.num_25.cast('float').alias('num_25'))

In [24]:
pd.DataFrame(df.take(5), columns=df.columns)

Unnamed: 0,msno,num_25
0,rxIP2f2aN0rYNp+toI0Obt/N/FYQX8hcO1fTmmy2h34=,0.0
1,rxIP2f2aN0rYNp+toI0Obt/N/FYQX8hcO1fTmmy2h34=,9.0
2,yxiEWwE9VR5utpUecLxVdQ5B7NysUPfrNtGINaM2zA8=,3.0
3,yxiEWwE9VR5utpUecLxVdQ5B7NysUPfrNtGINaM2zA8=,1.0
4,yxiEWwE9VR5utpUecLxVdQ5B7NysUPfrNtGINaM2zA8=,3.0


In [25]:
df.take(5)[2][1]

3.0

### Transformación de las columnas de string al tipo de datos que requerimos 

In [26]:
# Train data
train_data = train_data.select('msno', train_data.is_churn.cast('integer').alias('is_churn'))

In [27]:
train_data.printSchema()

root
 |-- msno: string (nullable = true)
 |-- is_churn: integer (nullable = true)



In [28]:
# function to transform date forma 'yyyymmdd' -> 'yyyy-mm-dd' -> date(yyyy-mm-dd)
def trans_date(date_string):
    year = date_string[0:4]
    month = date_string[4:6]
    day = date_string[6:8]
    return(year+'-'+month+'-'+day)

In [29]:
trans_date('19991201')

'1999-12-01'

In [30]:
from pyspark.sql.functions import udf
trans_date_udf = udf(trans_date)

In [31]:
transactions_clean_date = transactions.withColumn("transaction_date", trans_date_udf("transaction_date"))

In [32]:
# Transactions
transactions_clean_final = transactions_clean_date.select(
    'msno', 
    transactions_clean_date.payment_method_id.cast('integer').alias('payment_method_id'),
    transactions_clean_date.payment_plan_days.cast('integer').alias('payment_plan_days'),
    transactions_clean_date.plan_list_price.cast('integer').alias('plan_list_price'),
    transactions_clean_date.actual_amount_paid.cast('integer').alias('actual_amount_paid'),
    transactions_clean_date.is_auto_renew.cast('integer').alias('is_auto_renew'),
    transactions_clean_date.transaction_date.cast('date').alias('transaction_date'),
    transactions_clean_date.membership_expire_date.cast('date').alias('membership_expire_date'),
    transactions_clean_date.is_cancel.cast('integer').alias('is_cancel'),
)

In [33]:
transactions_clean_final.printSchema()

root
 |-- msno: string (nullable = true)
 |-- payment_method_id: integer (nullable = true)
 |-- payment_plan_days: integer (nullable = true)
 |-- plan_list_price: integer (nullable = true)
 |-- actual_amount_paid: integer (nullable = true)
 |-- is_auto_renew: integer (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- membership_expire_date: date (nullable = true)
 |-- is_cancel: integer (nullable = true)



In [1]:
pd.DataFrame(transactions_clean_final.take(5), columns=transactions_clean_final.columns)

NameError: name 'pd' is not defined

In [36]:
# Test data
sample_submission_zero = sample_submission_zero.select('msno', sample_submission_zero.is_churn.cast('integer').alias('is_churn'))

In [37]:
sample_submission_zero.printSchema()

root
 |-- msno: string (nullable = true)
 |-- is_churn: integer (nullable = true)



In [39]:
# User logs
# Cambiar las fechas de formato
user_logs_clean_date = user_logs.withColumn("date", trans_date_udf("date"))

In [40]:
user_logs.printSchema()

root
 |-- msno: string (nullable = true)
 |-- date: string (nullable = true)
 |-- num_25: string (nullable = true)
 |-- num_50: string (nullable = true)
 |-- num_75: string (nullable = true)
 |-- num_985: string (nullable = true)
 |-- num_100: string (nullable = true)
 |-- num_unq: string (nullable = true)
 |-- total_secs: string (nullable = true)



In [41]:
user_logs_clean_date = user_logs_clean_date.select(
    'msno', 
    user_logs_clean_date.date.cast('date').alias('date'),
    user_logs_clean_date.num_25.cast('integer').alias('num_25'),
    user_logs_clean_date.num_50.cast('integer').alias('num_50'),
    user_logs_clean_date.num_75.cast('integer').alias('num_75'),
    user_logs_clean_date.num_985.cast('integer').alias('num_985'),
    user_logs_clean_date.num_100.cast('integer').alias('num_100'),
    user_logs_clean_date.num_unq.cast('integer').alias('num_unq'),
    user_logs_clean_date.total_secs.cast('float').alias('total_secs'),
)

In [44]:
user_logs_clean_date.show()

+--------------------+----------+------+------+------+-------+-------+-------+----------+
|                msno|      date|num_25|num_50|num_75|num_985|num_100|num_unq|total_secs|
+--------------------+----------+------+------+------+-------+-------+-------+----------+
|rxIP2f2aN0rYNp+to...|2015-05-13|     0|     0|     0|      0|      1|      1|   280.335|
|rxIP2f2aN0rYNp+to...|2015-07-09|     9|     1|     0|      0|      7|     11|  1658.948|
|yxiEWwE9VR5utpUec...|2015-01-05|     3|     3|     0|      0|     68|     36| 17364.955|
|yxiEWwE9VR5utpUec...|2015-03-06|     1|     0|     1|      1|     97|     27| 24667.316|
|yxiEWwE9VR5utpUec...|2015-05-01|     3|     0|     0|      0|     38|     38|  9649.029|
|yxiEWwE9VR5utpUec...|2015-07-02|     4|     0|     1|      1|     33|     10|  10021.52|
|yxiEWwE9VR5utpUec...|2015-08-30|     3|     1|     0|      0|      4|      7|  1119.555|
|yxiEWwE9VR5utpUec...|2015-11-07|     1|     0|     0|      0|      4|      5|   938.022|
|yxiEWwE9V

In [49]:
# Members
# We have to also change the format_ of date
members_clean_date = members.withColumn("registration_init_time", trans_date_udf("registration_init_time"))

In [50]:
members_clean_date = members_clean_date.select(
    'msno', 
    'city',
    members_clean_date.bd.cast('integer').alias('bd'),
    'gender',
    members_clean_date.registered_via.cast('integer').alias('registered_via'),
    members_clean_date.registration_init_time.cast('date').alias('registration_init_time')
)

In [52]:
members_clean_date.printSchema()

root
 |-- msno: string (nullable = true)
 |-- city: string (nullable = true)
 |-- bd: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- registered_via: integer (nullable = true)
 |-- registration_init_time: date (nullable = true)



In [53]:
members_clean_date.show()

+--------------------+----+---+------+--------------+----------------------+
|                msno|city| bd|gender|registered_via|registration_init_time|
+--------------------+----+---+------+--------------+----------------------+
|Rb9UwLQTrxzBVwCB6...|   1|  0|      |            11|            2011-09-11|
|+tJonkh+O1CA796Fm...|   1|  0|      |             7|            2011-09-14|
|cV358ssn7a0f7jZOw...|   1|  0|      |            11|            2011-09-15|
|9bzDeJP6sQodK73K5...|   1|  0|      |            11|            2011-09-15|
|WFLY3s7z4EZsieHCt...|   6| 32|female|             9|            2011-09-15|
|yLkV2gbZ4GLFwqTOX...|   4| 30|  male|             9|            2011-09-16|
|jNCGK78YkTyId3H3w...|   1|  0|      |             7|            2011-09-16|
|WH5Jq4mgtfUFXh2yz...|   5| 34|  male|             9|            2011-09-16|
|tKmbR4X5VXjHmxERr...|   5| 19|  male|             9|            2011-09-17|
|I0yFvqMoNkM8ZNHb6...|  13| 63|  male|             9|            2011-09-18|

### Clean updated datasets

In [55]:
# Train data v2
train_data_2 = train_data_2.select('msno', train_data_2.is_churn.cast('integer').alias('is_churn'))

In [60]:
# Transactions v2
transactions_2_clean_date = transactions_2.withColumn("transaction_date", trans_date_udf("transaction_date"))

In [61]:
transactions_2_clean_date = transactions_2_clean_date.select(
    'msno', 
    transactions_2_clean_date.payment_method_id.cast('integer').alias('payment_method_id'),
    transactions_2_clean_date.payment_plan_days.cast('integer').alias('payment_plan_days'),
    transactions_2_clean_date.plan_list_price.cast('integer').alias('plan_list_price'),
    transactions_2_clean_date.actual_amount_paid.cast('integer').alias('actual_amount_paid'),
    transactions_2_clean_date.is_auto_renew.cast('integer').alias('is_auto_renew'),
    transactions_2_clean_date.transaction_date.cast('date').alias('transaction_date'),
    transactions_2_clean_date.membership_expire_date.cast('date').alias('membership_expire_date'),
    transactions_2_clean_date.is_cancel.cast('integer').alias('is_cancel'),
)

In [62]:
# User logs
user_logs_2_clean_date = user_logs_2.withColumn("date", trans_date_udf("date"))

In [63]:
user_logs_2_clean_date = user_logs_2_clean_date.select(
    'msno', 
    user_logs_2_clean_date.date.cast('date').alias('date'),
    user_logs_2_clean_date.num_25.cast('integer').alias('num_25'),
    user_logs_2_clean_date.num_50.cast('integer').alias('num_50'),
    user_logs_2_clean_date.num_75.cast('integer').alias('num_75'),
    user_logs_2_clean_date.num_985.cast('integer').alias('num_985'),
    user_logs_2_clean_date.num_100.cast('integer').alias('num_100'),
    user_logs_2_clean_date.num_unq.cast('integer').alias('num_unq'),
    user_logs_2_clean_date.total_secs.cast('float').alias('total_secs'),
)

In [65]:
# Members_2, no existe un archivo de miembros actualizado

### Preparing data for machine learning

In [66]:
# Vamos a juntar las tablas en una sola

In [67]:
transactions_clean_final.printSchema()

root
 |-- msno: string (nullable = true)
 |-- payment_method_id: integer (nullable = true)
 |-- payment_plan_days: integer (nullable = true)
 |-- plan_list_price: integer (nullable = true)
 |-- actual_amount_paid: integer (nullable = true)
 |-- is_auto_renew: integer (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- membership_expire_date: date (nullable = true)
 |-- is_cancel: integer (nullable = true)



In [68]:
user_logs_clean_date.printSchema()

root
 |-- msno: string (nullable = true)
 |-- date: date (nullable = true)
 |-- num_25: integer (nullable = true)
 |-- num_50: integer (nullable = true)
 |-- num_75: integer (nullable = true)
 |-- num_985: integer (nullable = true)
 |-- num_100: integer (nullable = true)
 |-- num_unq: integer (nullable = true)
 |-- total_secs: float (nullable = true)



In [70]:
members_clean_date.printSchema()

root
 |-- msno: string (nullable = true)
 |-- city: string (nullable = true)
 |-- bd: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- registered_via: integer (nullable = true)
 |-- registration_init_time: date (nullable = true)



In [83]:
#user_logs_clean_date.filter((user_logs_clean_date.msno != 'a') & (user_logs_clean_date.bar != 'b'))
user_logs_clean_date_unique_users = user_logs_clean_date.select('msno').distinct()

In [85]:
# Vamos a hacer una tabla "resumen" por usuario de "user logs" que se conforme de lo siguiente:
# última fecha de log, promedio de num_25 - num_unq y mediana
for row in user_logs_clean_date_unique_users.rdd.toLocalIterator():
    print(row)

KeyboardInterrupt: 

In [None]:
temp_data = user_logs_clean_date.filter((user_logs_clean_date.msno == row.msno))
# recibimis una spark data frame y debemos aplica un reduce para calcular el promedio, mediana, etc
temp_data_median_num_25 = temp_data.stat.approxQuantile("num_25", [0.5], 0.25)
temp_data_average_num_25 = temp_data.stat.agg({"num_25": "avg"})

temp_data_median_num_50 = temp_data.stat.approxQuantile("num_50", [0.5], 0.25)
temp_data_average_num_50 = temp_data.stat.agg({"num_50": "avg"})

temp_data_median_num_75 = temp_data.stat.approxQuantile("num_75", [0.5], 0.25)
temp_data_average_num_75 = temp_data.stat.agg({"num_75": "avg"})

temp_data_median_num_985 = temp_data.stat.approxQuantile("num_985", [0.5], 0.25)
temp_data_average_num_985 = temp_data.stat.agg({"num_985": "avg"})

temp_data_median_num_100 = temp_data.stat.approxQuantile("num_100", [0.5], 0.25)
temp_data_average_num_100 = temp_data.stat.agg({"num_100": "avg"})

temp_data_median_num_unq = temp_data.stat.approxQuantile("num_unq", [0.5], 0.25)
temp_data_average_num_unq = temp_data.stat.agg({"num_unq": "avg"})


In [None]:
user_logs_clean_date.stat.approxQuantile('num_25', [0.5], 0.25)

In [89]:
user_logs_clean_date_unique_users.show()

KeyboardInterrupt: 

In [None]:
# lets save clean files to csv

In [94]:
user_logs_clean_date.write.format('com.databricks.spark.csv').save('clean_data/user_logs_clean.csv')

In [97]:
transactions_clean_final.write.format('com.databricks.spark.csv').save('clean_data/transaction_clean.csv')

In [98]:
members_clean_date.write.format('com.databricks.spark.csv').save('clean_data/members_clean.csv')

In [100]:
input = spark.read.format('com.databricks.spark.csv').option("delimiter", "\001").option("header", "true").option("nullValue", "null").load("s3a://proyectomineria/data/user_logs/*")

Py4JJavaError: An error occurred while calling o1346.load.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:547)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.immutable.List.flatMap(List.scala:355)
	at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
	... 30 more


In [None]:
numeric_features_train = [t[0] for t in df.dtypes if t[1] == 'int']

# 