# Práctica de Spark con BiciMad, aplicamos ML a la base de datos 


In [411]:
#Importaciones Varias
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql import SparkSession
from datetime import datetime
import json
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import *
import matplotlib.pyplot as plt

In [448]:
spark = SparkSession.builder.getOrCreate()
df = spark.read.json('sample_10e3.json')
sc = spark.sparkContext

In [449]:
df.show(5)

+--------------------+--------+-----------+--------------+-------------+----------------+-----------+--------------------+--------------------+---------+--------+
|                 _id|ageRange|idplug_base|idplug_station|idunplug_base|idunplug_station|travel_time|     unplug_hourTime|       user_day_code|user_type|zip_code|
+--------------------+--------+-----------+--------------+-------------+----------------+-----------+--------------------+--------------------+---------+--------+
|{5cf83b752f3843a0...|       0|         21|            66|            8|              90|        219|{2019-06-01T00:00...|e4d55deb9ac172a8d...|        1|        |
|{5cf83b762f3843a0...|       4|         19|           136|           19|              71|        359|{2019-06-01T00:00...|8a0c4123e924a50a9...|        1|   28039|
|{5cf83b762f3843a0...|       4|         17|            38|            7|              39|        375|{2019-06-01T00:00...|a6a9c1f74a6849600...|        1|   28013|
|{5cf83b762f3843a0...|

In [450]:
def get_data(line):
    data = json.loads(line)
    data["_id"] = data["_id"]["$oid"]
    data['unplug_hourTime'] = datetime.strptime(data['unplug_hourTime']['$date'], "%Y-%m-%dT%H:%M:%S.%f%z")
    return data

In [451]:
rdd = sc.textFile('sample_10e2.json').map(get_data)
df = spark.createDataFrame(rdd)


In [475]:
rdd = sc.textFile('sample_10e3.json').map(get_data)
df1 = spark.createDataFrame(rdd)
rdd = sc.textFile('sample_10e4.json').map(get_data)
df2 = spark.createDataFrame(rdd)
df1.printSchema()
df2.printSchema()

root
 |-- _id: string (nullable = true)
 |-- ageRange: long (nullable = true)
 |-- idplug_base: long (nullable = true)
 |-- idplug_station: long (nullable = true)
 |-- idunplug_base: long (nullable = true)
 |-- idunplug_station: long (nullable = true)
 |-- travel_time: long (nullable = true)
 |-- unplug_hourTime: timestamp (nullable = true)
 |-- user_day_code: string (nullable = true)
 |-- user_type: long (nullable = true)
 |-- zip_code: string (nullable = true)

root
 |-- _id: string (nullable = true)
 |-- ageRange: long (nullable = true)
 |-- idplug_base: long (nullable = true)
 |-- idplug_station: long (nullable = true)
 |-- idunplug_base: long (nullable = true)
 |-- idunplug_station: long (nullable = true)
 |-- travel_time: long (nullable = true)
 |-- unplug_hourTime: timestamp (nullable = true)
 |-- user_day_code: string (nullable = true)
 |-- user_type: long (nullable = true)
 |-- zip_code: string (nullable = true)



In [476]:

df1 = df1.unionAll(df2)
df = df.unionAll(df1)
df.count()

11100

In [454]:
df.toPandas()

Unnamed: 0,_id,ageRange,idplug_base,idplug_station,idunplug_base,idunplug_station,travel_time,unplug_hourTime,user_day_code,user_type,zip_code
0,5cf83b752f3843a016be4e2f,0,21,66,8,90,219,2019-06-01,e4d55deb9ac172a8d8f5f0a32599815bd51b7c8760d67e...,1,
1,5cf83b762f3843a016be4e48,4,19,136,19,71,359,2019-06-01,8a0c4123e924a50a958f51985eb71aea750fb072438035...,1,28039
2,5cf83b762f3843a016be4e4f,4,17,38,7,39,375,2019-06-01,a6a9c1f74a68496000542210abc4fc2eba79e2756ad535...,1,28013
3,5cf83b762f3843a016be4e53,5,4,90,21,66,264,2019-06-01,5706c0bd494acc02279d532821c9666b0e506d4f81c838...,1,28009
4,5cf83b762f3843a016be4e54,4,3,166,13,152,367,2019-06-01,eb1b6d32bd4add5d5ff91af72a38786d61075c090383a5...,1,28006
...,...,...,...,...,...,...,...,...,...,...,...
95,5cf83b762f3843a016be4ef2,0,19,129,11,59,818,2019-06-01,f6bd6ac859f4b3ba75eb0a0e64c5d5124595cb6b302869...,1,
96,5cf83b762f3843a016be4ef3,5,8,55,4,135,606,2019-06-01,130cec42450f3b141e2ecd4caeeadfd18867e676e3db24...,1,28026
97,5cf83b762f3843a016be4ef4,0,3,9,5,62,232,2019-06-01,14ece3cc40a508b532589db9bb09158ed4cbe2f9af40f0...,1,
98,5cf83b762f3843a016be4ef5,5,4,77,3,65,301,2019-06-01,fa97bd556babed8fed7ef09a6502c4c4c400d904cfa901...,1,28007


In [455]:
df.count()
df.columns

['_id',
 'ageRange',
 'idplug_base',
 'idplug_station',
 'idunplug_base',
 'idunplug_station',
 'travel_time',
 'unplug_hourTime',
 'user_day_code',
 'user_type',
 'zip_code']

In [456]:
df.dtypes

[('_id', 'string'),
 ('ageRange', 'bigint'),
 ('idplug_base', 'bigint'),
 ('idplug_station', 'bigint'),
 ('idunplug_base', 'bigint'),
 ('idunplug_station', 'bigint'),
 ('travel_time', 'bigint'),
 ('unplug_hourTime', 'timestamp'),
 ('user_day_code', 'string'),
 ('user_type', 'bigint'),
 ('zip_code', 'string')]

In [457]:
df.describe().toPandas()

Unnamed: 0,summary,_id,ageRange,idplug_base,idplug_station,idunplug_base,idunplug_station,travel_time,user_day_code,user_type,zip_code
0,count,100,100.0,100.0,100.0,100.0,100.0,100.0,100,100.0,100.0
1,mean,,2.32,11.35,85.33,11.7,88.85,579.6,,1.0,27897.90740740741
2,stddev,,2.145608623126866,7.638783286689712,47.97517350387878,7.182329607158014,50.51499927264329,251.79501042063416,,0.0,3308.26571603138
3,min,5cf83b752f3843a016be4e2f,0.0,1.0,1.0,1.0,1.0,133.0,01afe1971a4a4fa918461e78fbc9ccd9c4538da0b606bf...,1.0,
4,max,5cf83b762f3843a016be4ef7,6.0,24.0,211.0,24.0,174.0,1264.0,fd44762b424ba560a44add4bae955a7e48343aa0da2fda...,1.0,39316.0


In [467]:
from pyspark.sql.functions import col

dataset = df.select(col('ageRange').cast('float'),
                        col('idunplug_base').cast('float'),
                         col('idunplug_station').cast('float'),
                        col('user_type').cast('float'),
                        col('zip_code'),
                    col('unplug_hourTime'),
                     col('travel_time').cast('float')
                        )
dataset.show()

+--------+-------------+----------------+---------+--------+-------------------+-----------+
|ageRange|idunplug_base|idunplug_station|user_type|zip_code|    unplug_hourTime|travel_time|
+--------+-------------+----------------+---------+--------+-------------------+-----------+
|     0.0|          8.0|            90.0|      1.0|        |2019-06-01 00:00:00|      219.0|
|     4.0|         19.0|            71.0|      1.0|   28039|2019-06-01 00:00:00|      359.0|
|     4.0|          7.0|            39.0|      1.0|   28013|2019-06-01 00:00:00|      375.0|
|     5.0|         21.0|            66.0|      1.0|   28009|2019-06-01 00:00:00|      264.0|
|     4.0|         13.0|           152.0|      1.0|   28006|2019-06-01 00:00:00|      367.0|
|     5.0|          4.0|            55.0|      1.0|   28907|2019-06-01 00:00:00|      174.0|
|     0.0|          6.0|           133.0|      1.0|        |2019-06-01 00:00:00|      308.0|
|     4.0|          3.0|           153.0|      1.0|   28003|2019-06-01

In [459]:
dataset.describe().toPandas()


Unnamed: 0,summary,ageRange,idunplug_base,idunplug_station,user_type,zip_code,travel_time
0,count,100.0,100.0,100.0,100.0,100.0,100.0
1,mean,2.32,11.7,88.85,1.0,27897.90740740741,579.6
2,stddev,2.145608623126866,7.182329607158014,50.51499927264329,0.0,3308.26571603138,251.79501042063416
3,min,0.0,1.0,1.0,1.0,,133.0
4,max,6.0,24.0,174.0,1.0,39316.0,1264.0


In [460]:
from pyspark.sql.functions import isnull, when, count, col
dataset.select([count(when(isnull(c), c)).alias(c) for c in dataset.columns]).show()

+--------+-------------+----------------+---------+--------+---------------+-----------+
|ageRange|idunplug_base|idunplug_station|user_type|zip_code|unplug_hourTime|travel_time|
+--------+-------------+----------------+---------+--------+---------------+-----------+
|       0|            0|               0|        0|       0|              0|          0|
+--------+-------------+----------------+---------+--------+---------------+-----------+



In [461]:
from pyspark.ml.feature import StringIndexer
dataset = StringIndexer(
    inputCol='zip_code', 
    outputCol='CP', 
    handleInvalid='keep').fit(dataset).transform(dataset)
dataset.withColumn("datetype",
    to_date(col('unplug_hourTime'),"yyyy-MM-dd"))
dataset=dataset.select("*", month('unplug_hourTime').alias('month'))

In [463]:
dataset.show()


+--------+-------------+----------------+---------+--------+-------------------+-----------+----+-----+
|ageRange|idunplug_base|idunplug_station|user_type|zip_code|    unplug_hourTime|travel_time|  CP|month|
+--------+-------------+----------------+---------+--------+-------------------+-----------+----+-----+
|     0.0|          8.0|            90.0|      1.0|        |2019-06-01 00:00:00|      219.0| 0.0|    6|
|     4.0|         19.0|            71.0|      1.0|   28039|2019-06-01 00:00:00|      359.0|22.0|    6|
|     4.0|          7.0|            39.0|      1.0|   28013|2019-06-01 00:00:00|      375.0|16.0|    6|
|     5.0|         21.0|            66.0|      1.0|   28009|2019-06-01 00:00:00|      264.0| 2.0|    6|
|     4.0|         13.0|           152.0|      1.0|   28006|2019-06-01 00:00:00|      367.0| 3.0|    6|
|     5.0|          4.0|            55.0|      1.0|   28907|2019-06-01 00:00:00|      174.0|28.0|    6|
|     0.0|          6.0|           133.0|      1.0|        |2019

In [427]:
dataset.describe().toPandas()

Unnamed: 0,summary,ageRange,idunplug_base,idunplug_station,user_type,zip_code,travel_time,CP,month
0,count,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0
1,mean,2.32,11.7,88.85,1.0,27897.90740740741,579.6,5.77,6.0
2,stddev,2.145608623126866,7.182329607158014,50.51499927264329,0.0,3308.26571603138,251.79501042063416,8.349505519744413,0.0
3,min,0.0,1.0,1.0,1.0,,133.0,0.0,6.0
4,max,6.0,24.0,174.0,1.0,39316.0,1264.0,30.0,6.0


In [464]:
# Drop unnecessary columns
dataset = dataset.drop('zip_code')
dataset = dataset.drop('unplug_hourTime')
dataset.show()

+--------+-------------+----------------+---------+-----------+----+-----+
|ageRange|idunplug_base|idunplug_station|user_type|travel_time|  CP|month|
+--------+-------------+----------------+---------+-----------+----+-----+
|     0.0|          8.0|            90.0|      1.0|      219.0| 0.0|    6|
|     4.0|         19.0|            71.0|      1.0|      359.0|22.0|    6|
|     4.0|          7.0|            39.0|      1.0|      375.0|16.0|    6|
|     5.0|         21.0|            66.0|      1.0|      264.0| 2.0|    6|
|     4.0|         13.0|           152.0|      1.0|      367.0| 3.0|    6|
|     5.0|          4.0|            55.0|      1.0|      174.0|28.0|    6|
|     0.0|          6.0|           133.0|      1.0|      308.0| 0.0|    6|
|     4.0|          3.0|           153.0|      1.0|      462.0| 7.0|    6|
|     0.0|         14.0|            44.0|      1.0|      482.0| 0.0|    6|
|     4.0|         21.0|            85.0|      1.0|      480.0| 8.0|    6|
|     5.0|         18.0| 

In [465]:
# Assemble all the features with VectorAssembler
required_features = ['idunplug_base',
 'idunplug_station',
 'user_type',
 'travel_time','month','CP']
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=required_features, outputCol='features')
transformed_data = assembler.transform(dataset)

In [466]:
transformed_data.describe()


DataFrame[summary: string, ageRange: string, idunplug_base: string, idunplug_station: string, user_type: string, travel_time: string, CP: string, month: string]

In [468]:
(training_data, test_data) = transformed_data.randomSplit([0.8,0.2])

In [469]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol='ageRange', 
                            featuresCol='features',
                            maxDepth=5)

DataFrame[ageRange: float, idunplug_base: float, idunplug_station: float, user_type: float, travel_time: float, CP: double, month: int, features: vector]

In [471]:
model = rf.fit(training_data)


In [472]:
predictions = model.transform(test_data)

In [473]:
# Evaluate our model
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol='ageRange', 
    predictionCol='prediction', 
    metricName='accuracy')

In [474]:
accuracy = evaluator.evaluate(predictions)
print('Test Accuracy = ', accuracy)

Test Accuracy =  0.7368421052631579


### Como se puede observar, la precisión es bastante alta, luego aunque no se sacan resultados concluyentes para predecir el tiempo de viaje, si que se puede predecir con alta fiabilidad la edad del usuario según el uso que realice.