In [1]:
!pip install pyspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=e949020c23c667bcc7cad34cfb6e2abd95c3a61518de26d8098afd499bc50cf0
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [41]:
import pyspark.pandas as ps
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import functions as F
import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
import time
from pyspark.sql.types import StructType
from pyspark.sql.types import *
from pyspark.sql.functions import unix_timestamp, datediff, to_timestamp, expr, max, min

In [4]:
spark = SparkSession.builder.appName("BigDataProject").getOrCreate()

#Pre-procesamiento de Data

In [157]:
taxifile = "/content/drive/MyDrive/BigDataP/yellow_tripdata_2018-04.parquet"

df = spark.read.parquet(taxifile)
df.head()

Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2018, 4, 1, 0, 22, 20), tpep_dropoff_datetime=datetime.datetime(2018, 4, 1, 0, 22, 26), passenger_count=1, trip_distance=0.0, RatecodeID=1, store_and_fwd_flag='N', PULocationID=145, DOLocationID=145, payment_type=2, fare_amount=2.5, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=3.8, congestion_surcharge=None, airport_fee=None)

### Eliminando columnas innecesarias

In [158]:
df_clear = df.drop("PULocationID", "DOLocationID", "extra", "store_and_fwd_flag", "RatecodeID", "improvement_surcharge", "congestion_surcharge", "mta_tax", "tip_amount", "tolls_amount", "total_amount", "VendorID", "airport_fee","passenger_count", "payment_type")
df_clear.head()

Row(tpep_pickup_datetime=datetime.datetime(2018, 4, 1, 0, 22, 20), tpep_dropoff_datetime=datetime.datetime(2018, 4, 1, 0, 22, 26), trip_distance=0.0, fare_amount=2.5)

In [160]:
df_clear = df_clear.withColumn("tpep_pickup_datetime", to_timestamp("tpep_pickup_datetime"))
df_clear = df_clear.withColumn("tpep_dropoff_datetime", to_timestamp("tpep_dropoff_datetime"))

### Generando la columna de tiempo total del viaje

In [161]:
df_clear = df_clear.withColumn("trip_time",expr("unix_timestamp(tpep_dropoff_datetime) - unix_timestamp(tpep_pickup_datetime)")/60.0)

In [162]:
df_clear = df_clear.drop("tpep_pickup_datetime","tpep_dropoff_datetime")

### Limpieza de datos

In [163]:
df_clear = df_clear.dropna()

In [164]:
df_clear.show(10)

+-------------+-----------+-------------------+
|trip_distance|fare_amount|          trip_time|
+-------------+-----------+-------------------+
|          0.0|        2.5|                0.1|
|          6.7|       22.5| 21.083333333333332|
|          4.1|       15.5|              15.65|
|          0.7|        5.5|  5.266666666666667|
|          0.0|        2.5|0.08333333333333333|
|          2.5|       11.5| 12.683333333333334|
|          1.8|        8.5|  7.916666666666667|
|          0.9|        5.5|               5.05|
|          1.6|        8.5|                8.9|
|          1.1|        5.5|  4.166666666666667|
+-------------+-----------+-------------------+
only showing top 10 rows



In [166]:
df_clear = df_clear[df_clear["trip_distance"]>0.0]
df_clear = df_clear[df_clear["trip_time"]>0.0]
df_clear = df_clear[df_clear["fare_amount"]>0.0]
df_clear.show(10)

+-------------+-----------+------------------+
|trip_distance|fare_amount|         trip_time|
+-------------+-----------+------------------+
|          6.7|       22.5|21.083333333333332|
|          4.1|       15.5|             15.65|
|          0.7|        5.5| 5.266666666666667|
|          2.5|       11.5|12.683333333333334|
|          1.8|        8.5| 7.916666666666667|
|          0.9|        5.5|              5.05|
|          1.6|        8.5|               8.9|
|          1.1|        5.5| 4.166666666666667|
|          0.8|        4.5|3.3666666666666667|
|          1.0|        6.0| 5.933333333333334|
+-------------+-----------+------------------+
only showing top 10 rows



In [167]:
df_clear.select(min(df_clear.trip_distance).alias("Min: trip_distance"),
                min(df_clear.trip_time).alias("Min: trip_time"),
                min(df_clear.fare_amount).alias("Min: fare_amount")
          ).show()

df_clear.select(max(df_clear.trip_distance).alias("Max: trip_distance"),
                max(df_clear.trip_time).alias("Max: trip_time"),
                max(df_clear.fare_amount).alias("Max: fare_amount")
          ).show()


+------------------+--------------------+----------------+
|Min: trip_distance|      Min: trip_time|Min: fare_amount|
+------------------+--------------------+----------------+
|              0.01|0.016666666666666666|            0.01|
+------------------+--------------------+----------------+

+------------------+------------------+----------------+
|Max: trip_distance|    Max: trip_time|Max: fare_amount|
+------------------+------------------+----------------+
|             943.5|2002.8333333333333|          6023.0|
+------------------+------------------+----------------+



# Modelo

### Entrenamiento

In [168]:
assembler = VectorAssembler(
    inputCols=["trip_distance", "trip_time"],
    outputCol="features"
)
data = assembler.transform(df_clear)

data = data.select(["features", "fare_amount"])

In [169]:
training_data, testing_data = data.randomSplit([0.8, 0.2], seed=42)

In [170]:
data.show(10)

+--------------------+-----------+
|            features|fare_amount|
+--------------------+-----------+
|[6.7,21.083333333...|       22.5|
|         [4.1,15.65]|       15.5|
|[0.7,5.2666666666...|        5.5|
|[2.5,12.683333333...|       11.5|
|[1.8,7.9166666666...|        8.5|
|          [0.9,5.05]|        5.5|
|           [1.6,8.9]|        8.5|
|[1.1,4.1666666666...|        5.5|
|[0.8,3.3666666666...|        4.5|
|[1.0,5.9333333333...|        6.0|
+--------------------+-----------+
only showing top 10 rows



In [171]:
linearR = LinearRegression(featuresCol="features",labelCol="fare_amount")
start_time = time.time()
model = linearR.fit(data)
end_time = time.time()

df_clear.count()
print(end_time-start_time)
print(model.summary.rootMeanSquaredError)
print(model.summary.r2)

27.888118028640747
4.6715953439049835
0.8335501410418094


### Testeo

In [172]:
predictions = model.transform(testing_data)
evaluator = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="rmse")
metric = evaluator.evaluate(predictions)

In [None]:
predictions.select("features", "fare_amount", "prediction").show()