In [1]:
from pyspark import SparkContext,HiveContext
from pyspark.sql import SQLContext,SparkSession
from pyspark import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.window import Window

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

import os
import subprocess
import pandas as pd

from functools import reduce
from datetime import date, datetime


In [2]:
now = datetime.now()
now

datetime.datetime(2021, 6, 20, 19, 0, 21, 34144)

In [3]:
SparkContext.setSystemProperty('spark.executor.memory', '11468m')
SparkContext.setSystemProperty('spark.executor.instances', '6')

In [4]:
sparkSession = SparkSession.builder.appName("bigdatita").getOrCreate()

In [5]:
sparkSession.sparkContext.getConf().getAll()

[('spark.ui.proxyBase', '/proxy/application_1624138385244_0006'),
 ('spark.eventLog.enabled', 'true'),
 ('spark.dynamicAllocation.minExecutors', '1'),
 ('spark.app.startTime', '1624215623358'),
 ('spark.sql.warehouse.dir', 'file:/spark-warehouse'),
 ('spark.eventLog.dir',
  'gs://dataproc-temp-us-central1-749534659025-ywijjblb/100dc73e-522d-450b-8df4-b014a698738b/spark-job-history'),
 ('spark.yarn.am.memory', '640m'),
 ('spark.driver.host',
  'scala-practice-m.us-central1-a.c.future-surge-316401.internal'),
 ('spark.executor.instances', '6'),
 ('spark.executor.memory', '11468m'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.yarn.unmanagedAM.enabled', 'true'),
 ('spark.sql.autoBroadcastJoinThreshold', '43m'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.appUIAddress',
  'http://scala-practice-m.us-central1-a.c.future-surge-316401.internal:39239'),
 ('spark.ui.filters',
  'org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter'),
 ('spark.sql.cbo.joinReorder.en

In [6]:
df = sparkSession.read.csv('hdfs:///user/jorge/practica/data_raw.csv', inferSchema=False, header=True)

In [7]:
df.printSchema()

root
 |-- trip_id: string (nullable = true)
 |-- usertype: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- starttime: string (nullable = true)
 |-- stoptime: string (nullable = true)
 |-- tripduration: string (nullable = true)
 |-- from_station_id: string (nullable = true)
 |-- from_station_name: string (nullable = true)
 |-- latitude_start: string (nullable = true)
 |-- longitude_start: string (nullable = true)
 |-- dpcapacity_start: string (nullable = true)
 |-- to_station_id: string (nullable = true)
 |-- to_station_name: string (nullable = true)
 |-- latitude_end: string (nullable = true)
 |-- longitude_end: string (nullable = true)
 |-- dpcapacity_end: string (nullable = true)
 |-- temperature: string (nullable = true)
 |-- windchill: string (nullable = true)
 |-- dewpoint: string (nullable = true)
 |-- humidity: string (nullable = true)
 |-- pressure: string (nullable = true)
 |-- visibility: string (nullable = true)
 |-- wind_speed: string (nullable = true)
 

In [8]:
df = df.withColumn('trip_id', F.col('trip_id').cast('int'))
df = df.withColumn('starttime', F.to_timestamp(F.col('starttime')))
df = df.withColumn('stoptime', F.to_timestamp(F.col('stoptime')))
df = df.withColumn('tripduration', F.col('tripduration').cast('int'))
df = df.withColumn('from_station_id', F.col('from_station_id').cast('int'))
df = df.withColumn('latitude_start', F.col('latitude_start').cast('double'))
df = df.withColumn('longitude_start', F.col('longitude_start').cast('double'))
df = df.withColumn('dpcapacity_start', F.col('dpcapacity_start').cast('int'))
df = df.withColumn('to_station_id', F.col('to_station_id').cast('int'))
df = df.withColumn('latitude_end', F.col('latitude_start').cast('double'))
df = df.withColumn('longitude_end', F.col('longitude_start').cast('double'))
df = df.withColumn('dpcapacity_end', F.col('dpcapacity_start').cast('int'))


In [9]:
df = df.drop('temperature')
df = df.drop('usertype')
df = df.drop('gender')
df = df.drop('stoptime')
df = df.drop('from_station_name')
df = df.drop('latitude_start')
df = df.drop('longitude_start')
df = df.drop('dpcapacity_start')
df = df.drop('to_station_name')
df = df.drop('latitude_end')
df = df.drop('longitude_end')
df = df.drop('dpcapacity_end')
df = df.drop('windchill')
df = df.drop('dewpoint')
df = df.drop('humidity')
df = df.drop('pressure')
df = df.drop('visibility')
df = df.drop('wind_speed')
df = df.drop('precipitation')
df = df.drop('events')
df = df.drop('rain')
df = df.drop('conditions')
df = df.drop('trip_id')

#this is another test change we should not pay attention to it

In [10]:
df.printSchema()

root
 |-- starttime: timestamp (nullable = true)
 |-- tripduration: integer (nullable = true)
 |-- from_station_id: integer (nullable = true)
 |-- to_station_id: integer (nullable = true)



In [11]:
df = df.withColumn('week', F.weekofyear(F.col('starttime')))
df = df.withColumn('year', F.year(F.col('starttime')))

In [12]:
weeks = df.select('week', 'year').distinct().orderBy('year', 'week')

In [13]:
weeks = weeks.filter( "NOT (week = 1 and year = 2013)")

In [14]:
weeks = weeks.withColumn('week_id', F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))

In [15]:
weeks = weeks.withColumn('ancla', F.concat(F.col('year'), F.col('week')))

In [16]:
weeks.show(5)

+----+----+-------+------+
|week|year|week_id| ancla|
+----+----+-------+------+
|  26|2013|      1|201326|
|  27|2013|      2|201327|
|  28|2013|      3|201328|
|  29|2013|      4|201329|
|  30|2013|      5|201330|
+----+----+-------+------+
only showing top 5 rows



In [17]:
df = df.join(weeks, ['week', 'year'], 'inner')

In [18]:
df.printSchema()

root
 |-- week: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- tripduration: integer (nullable = true)
 |-- from_station_id: integer (nullable = true)
 |-- to_station_id: integer (nullable = true)
 |-- week_id: integer (nullable = true)
 |-- ancla: string (nullable = true)



In [19]:
semi = 2
semf = df.agg({'week_id':'max'}).collect()[0][0]
vobs = 12
vdes = 1
step = 3
anclai = semi + vobs -1
anclaf = semf - vdes
anclai, anclaf

(13, 236)

In [20]:
def ing(df,k,ancla):
    aux = df.filter((df['week_id']>=(ancla-k+1))&(df['week_id']<=ancla))
    expr = [y(F.col('tripduration')).alias(f'x_{z}_{k}') for y,z in zip([F.mean, F.count],
                                                             ['media', 'conteo'])]
    aux = aux.groupBy('from_station_id', 'to_station_id').agg(*expr).withColumn('week_id',F.lit(ancla))
    return aux

In [24]:
def tgt(df,ancla):
    aux = df.filter((df['week_id']==(ancla+vdes)))
    expr = [F.count(F.col('tripduration')).alias('y')]
    aux = aux.select('from_station_id', 'to_station_id', 'tripduration').groupby('from_station_id', 'to_station_id').agg(*expr)
    aux = aux.withColumn('week_id',F.lit(ancla))
    return aux

In [25]:
um = ['from_station_id', 'to_station_id', 'week_id']
ancla = 200
aux = reduce(lambda x,y:x.join(y,um,'outer'),
         map(lambda k:ing(df,k,ancla),
             range(step,vobs+step,step))).join(
                 tgt(df,ancla),
                   um,
                   how='inner')
aux = aux.withColumn('ruta', F.concat(F.col('from_station_id'), F.lit('|'), F.col('to_station_id')))
aux = aux.join(weeks, 'week_id', how='inner')
aux = aux.drop('week_id', 'year', 'week','from_station_id', 'to_station_id')

aux.select(
    'ruta', 'ancla', 
    'x_conteo_3', 'x_conteo_6', 'x_conteo_9', 'x_conteo_12',
    'x_media_3', 'x_media_6', 'x_media_9', 'x_media_12',
    'y'
).write.csv('/user/jorge/results/pred_meantest', mode='overwrite', header=True)

In [26]:
aux.printSchema()

root
 |-- x_media_3: double (nullable = true)
 |-- x_conteo_3: long (nullable = true)
 |-- x_media_6: double (nullable = true)
 |-- x_conteo_6: long (nullable = true)
 |-- x_media_9: double (nullable = true)
 |-- x_conteo_9: long (nullable = true)
 |-- x_media_12: double (nullable = true)
 |-- x_conteo_12: long (nullable = true)
 |-- y: long (nullable = false)
 |-- ruta: string (nullable = true)
 |-- ancla: string (nullable = true)



In [None]:
os.system('hdfs dfs -rm /user/jorge/results/pred_mean.csv/p*')

In [27]:
um = ['from_station_id', 'to_station_id', 'week_id']

In [None]:
for ancla in range(anclai,anclaf+1):
    aux = reduce(lambda x,y:x.join(y,um,'outer'),
             map(lambda k:ing(df,k,ancla),
                 range(step,vobs+step,step))).join(
                     tgt(df,ancla),
                       um,
                       how='inner')
    aux = aux.withColumn('ruta', F.concat(F.col('from_station_id'), F.lit('|'), F.col('to_station_id')))
    aux = aux.join(weeks, 'week_id', how='inner')
    aux = aux.drop('week_id', 'year', 'week','from_station_id', 'to_station_id')

    aux.select(
        'ruta', 'ancla', 
        'x_conteo_3', 'x_conteo_6', 'x_conteo_9', 'x_conteo_12',
        'x_media_3', 'x_media_6', 'x_media_9', 'x_media_12',
        'y'
    ).write.csv('/user/jorge/results/pred_mean.csv', mode='append')

In [None]:
os.system("hdfs dfs -cat /user/jorge/results/pred_mean.csv/p* > final_res.csv")

0

In [None]:
os.system("hdfs dfs -put -f /final_res.csv /user/jorge/results/")

256

In [35]:
print('finish')

finish


In [58]:
del df

In [59]:
ss2 = SparkSession.builder.appName("bigdatita-regr").getOrCreate()

In [60]:
df = ss2.read.csv('/user/jorge/results/final_res.csv', inferSchema=False, header=False)

In [61]:
df.show(5)

+----+------+---+---+---+---+------------------+------------------+------------------+------------------+----+
| _c0|   _c1|_c2|_c3|_c4|_c5|               _c6|               _c7|               _c8|               _c9|_c10|
+----+------+---+---+---+---+------------------+------------------+------------------+------------------+----+
| 2|2|201719| 42|150|172|194|3284.1666666666665|3846.6133333333332|3614.6686046511627| 3596.082474226804|  26|
| 2|3|201719| 23| 47| 55| 78|1102.4347826086957|  978.063829787234| 948.4727272727273| 950.0128205128206|  19|
|2|26|201719| 11| 21| 26| 26| 679.3636363636364| 1271.095238095238|1754.1153846153845|1754.1153846153845|   4|
|2|45|201719|  1|  7|  7| 10|            2519.0| 2187.714285714286| 2187.714285714286|            1729.6|   3|
|2|49|201719|  2|  2|  5|  5|             612.5|             612.5|            1682.6|            1682.6|   1|
+----+------+---+---+---+---+------------------+------------------+------------------+------------------+----+
o

In [62]:
cols = ['ruta', 'ancla', 
              'x_num_tot_viajes_3', 'x_num_tot_viajes_6', 'x_num_tot_viajes_9', 'x_num_tot_viajes_12', 
              'x_duracion_prom_viaje_3', 'x_duracion_prom_viaje_6', 'x_duracion_prom_viaje_9', 'x_duracion_prom_viaje_12', 
              'y']

In [63]:
for idx, col in enumerate(cols):
    df = df.withColumnRenamed(f'_c{idx}', col)

In [64]:
df.printSchema()

root
 |-- ruta: string (nullable = true)
 |-- ancla: string (nullable = true)
 |-- x_num_tot_viajes_3: string (nullable = true)
 |-- x_num_tot_viajes_6: string (nullable = true)
 |-- x_num_tot_viajes_9: string (nullable = true)
 |-- x_num_tot_viajes_12: string (nullable = true)
 |-- x_duracion_prom_viaje_3: string (nullable = true)
 |-- x_duracion_prom_viaje_6: string (nullable = true)
 |-- x_duracion_prom_viaje_9: string (nullable = true)
 |-- x_duracion_prom_viaje_12: string (nullable = true)
 |-- y: string (nullable = true)



In [65]:
df = df.withColumn('x_num_tot_viajes_3', F.col('x_num_tot_viajes_3').cast('int'))
df = df.withColumn('x_num_tot_viajes_6', F.col('x_num_tot_viajes_6').cast('int'))
df = df.withColumn('x_num_tot_viajes_9', F.col('x_num_tot_viajes_9').cast('int'))
df = df.withColumn('x_num_tot_viajes_12', F.col('x_num_tot_viajes_12').cast('int'))
df = df.withColumn('x_duracion_prom_viaje_3', F.col('x_duracion_prom_viaje_3').cast('int'))
df = df.withColumn('x_duracion_prom_viaje_6', F.col('x_duracion_prom_viaje_6').cast('int'))
df = df.withColumn('x_duracion_prom_viaje_9', F.col('x_duracion_prom_viaje_9').cast('int'))
df = df.withColumn('x_duracion_prom_viaje_12', F.col('x_duracion_prom_viaje_12').cast('int'))
df = df.withColumn('y', F.col('y').cast('int'))

In [66]:
train, test = df.randomSplit([0.7, 0.3])

In [67]:
train

DataFrame[ruta: string, ancla: string, x_num_tot_viajes_3: int, x_num_tot_viajes_6: int, x_num_tot_viajes_9: int, x_num_tot_viajes_12: int, x_duracion_prom_viaje_3: int, x_duracion_prom_viaje_6: int, x_duracion_prom_viaje_9: int, x_duracion_prom_viaje_12: int, y: int]

In [68]:
x_cols = ['x_num_tot_viajes_3', 'x_num_tot_viajes_6', 
          'x_num_tot_viajes_9', 'x_num_tot_viajes_12', 
          'x_duracion_prom_viaje_3', 'x_duracion_prom_viaje_6', 
          'x_duracion_prom_viaje_9', 'x_duracion_prom_viaje_12']

In [69]:
assembler = VectorAssembler(inputCols=x_cols, outputCol='features', handleInvalid='skip')

In [70]:
v = assembler.transform(train)

In [71]:
v.show(5)

+-------+------+------------------+------------------+------------------+-------------------+-----------------------+-----------------------+-----------------------+------------------------+---+--------------------+
|   ruta| ancla|x_num_tot_viajes_3|x_num_tot_viajes_6|x_num_tot_viajes_9|x_num_tot_viajes_12|x_duracion_prom_viaje_3|x_duracion_prom_viaje_6|x_duracion_prom_viaje_9|x_duracion_prom_viaje_12|  y|            features|
+-------+------+------------------+------------------+------------------+-------------------+-----------------------+-----------------------+-----------------------+------------------------+---+--------------------+
|100|103|201534|                 1|                 1|                 1|                  1|                    666|                    666|                    666|                     666|  1|[1.0,1.0,1.0,1.0,...|
|100|103|201536|                 1|                 2|                 2|                  2|                    986|                   

In [72]:
v.count()

2316196

In [73]:
model = LinearRegression(featuresCol='features', labelCol='y')

In [74]:
model = model.fit(v)

In [75]:
vt = assembler.transform(test)

In [76]:
ev = RegressionEvaluator(predictionCol='prediction',labelCol='y',metricName='mae')

In [77]:
ev.evaluate(model.transform(vt).select('features','y','prediction'))

1.6158255080070592

In [78]:
ev.evaluate(model.transform(v).select('features','y','prediction'))

1.6157694249630519

In [57]:
model.transform(v).show(5)

+-------+------+------------------+------------------+------------------+-------------------+-----------------------+-----------------------+-----------------------+------------------------+----+--------------------+------------------+
|   ruta| ancla|x_num_tot_viajes_3|x_num_tot_viajes_6|x_num_tot_viajes_9|x_num_tot_viajes_12|x_duracion_prom_viaje_3|x_duracion_prom_viaje_6|x_duracion_prom_viaje_9|x_duracion_prom_viaje_12|   y|            features|        prediction|
+-------+------+------------------+------------------+------------------+-------------------+-----------------------+-----------------------+-----------------------+------------------------+----+--------------------+------------------+
|100|103|201534|                 1|                 1|                 1|                  1|                    666|                    666|                    666|                     666| 986|[1.0,1.0,1.0,1.0,...| 860.7657453393729|
|100|103|201724|                 2|                 2|  

In [80]:
res_v = assembler.transform(df)

In [81]:
res = model.transform(res_v)

In [82]:
res.show(5)

+----+------+------------------+------------------+------------------+-------------------+-----------------------+-----------------------+-----------------------+------------------------+---+--------------------+------------------+
|ruta| ancla|x_num_tot_viajes_3|x_num_tot_viajes_6|x_num_tot_viajes_9|x_num_tot_viajes_12|x_duracion_prom_viaje_3|x_duracion_prom_viaje_6|x_duracion_prom_viaje_9|x_duracion_prom_viaje_12|  y|            features|        prediction|
+----+------+------------------+------------------+------------------+-------------------+-----------------------+-----------------------+-----------------------+------------------------+---+--------------------+------------------+
| 2|2|201719|                42|               150|               172|                194|                   3284|                   3846|                   3614|                    3596| 26|[42.0,150.0,172.0...|  20.1285845001129|
| 2|3|201719|                23|                47|                55|  

In [83]:
res.columns

['ruta',
 'ancla',
 'x_num_tot_viajes_3',
 'x_num_tot_viajes_6',
 'x_num_tot_viajes_9',
 'x_num_tot_viajes_12',
 'x_duracion_prom_viaje_3',
 'x_duracion_prom_viaje_6',
 'x_duracion_prom_viaje_9',
 'x_duracion_prom_viaje_12',
 'y',
 'features',
 'prediction']

In [88]:
res.select(['ruta',
 'ancla',
 'x_num_tot_viajes_3',
 'x_num_tot_viajes_6',
 'x_num_tot_viajes_9',
 'x_num_tot_viajes_12',
 'x_duracion_prom_viaje_3',
 'x_duracion_prom_viaje_6',
 'x_duracion_prom_viaje_9',
 'x_duracion_prom_viaje_12',
 'prediction']).toPandas().to_csv('resultadisimo.csv', index=False)