## Creando el entorno de PySpark

In [1]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 78kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 23.1MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=bdcd46dabda8b4c29931c4f8e8226f7ac2a5a0e9d4eae70c0ae37a2054f6d1ca
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark import SQLContext

spark = SparkSession.builder.getOrCreate()

In [4]:
spark = SparkSession \
    .builder \
    .appName("how to read csv file") \
    .getOrCreate()

In [5]:
spark

## Cargando la Data

In [6]:
df = spark.read.csv("/content/drive/MyDrive/train.csv", inferSchema=True, header=True)

Observamos los datos.

In [7]:
df.show()

+--------------------+-----------+--------------------+----------------+---------------+-----------------+----------------+---------------+
|                 key|fare_amount|     pickup_datetime|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|passenger_count|
+--------------------+-----------+--------------------+----------------+---------------+-----------------+----------------+---------------+
|2009-06-15 17:26:...|        4.5|2009-06-15 17:26:...|      -73.844311|      40.721319|        -73.84161|       40.712278|              1|
|2010-01-05 16:52:...|       16.9|2010-01-05 16:52:...|      -74.016048|      40.711303|       -73.979268|       40.782004|              1|
|2011-08-18 00:35:...|        5.7|2011-08-18 00:35:...|      -73.982738|       40.76127|       -73.991242|       40.750562|              2|
|2012-04-21 04:30:...|        7.7|2012-04-21 04:30:...|       -73.98713|      40.733143|       -73.991567|       40.758092|              1|
|2010-03-09 07:51:..

## Eliminando variables irrelevantes

Eliminamos variables que no aportan ninguna información.

In [8]:
df = df.drop('key')

## Analizando las variables

In [9]:
df.printSchema()

root
 |-- fare_amount: double (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- passenger_count: integer (nullable = true)



In [10]:
print('Filas: ',df.count())
print('Columnas: ',len(df.columns))

Filas:  55423856
Columnas:  7


## Resumen estadístico de todas las variables

In [11]:
df.describe().show()

+-------+------------------+--------------------+------------------+-----------------+------------------+------------------+------------------+
|summary|       fare_amount|     pickup_datetime|  pickup_longitude|  pickup_latitude| dropoff_longitude|  dropoff_latitude|   passenger_count|
+-------+------------------+--------------------+------------------+-----------------+------------------+------------------+------------------+
|  count|          55423856|            55423856|          55423856|         55423856|          55423480|          55423480|          55423856|
|   mean|11.345045601663852|                null|-72.50968444358729| 39.9197917868882|-72.51120972971809|39.920681444828844|1.6853799201556816|
| stddev|  20.7108321982325|                null| 12.84888338140265|9.642353041994934|12.782196517830771| 9.633345796415124|1.3276643570959683|
|    min|            -300.0|2009-01-01 00:00:...|      -3442.059565|     -3492.263768|      -3442.024565|      -3547.886698|            

## Analizando valores nulos

In [12]:
import pyspark.sql.functions as F
def count_missings(spark_df,sort=True):
    """
    Counts number of nulls and nans in each column
    """
    df = spark_df.select([F.count(F.when(F.isnan(c) | F.isnull(c), c)).alias(c) for (c,c_type) in spark_df.dtypes if c_type not in ('timestamp', 'string', 'date')]).toPandas()

    if len(df) == 0:
        print("There are no any missing values!")
        return None

    if sort:
        return df.rename(index={0: 'count'}).T.sort_values("count",ascending=False)

    return df

In [13]:
count_missings(df)

Unnamed: 0,count
dropoff_longitude,376
dropoff_latitude,376
fare_amount,0
pickup_longitude,0
pickup_latitude,0
passenger_count,0


In [14]:
df = df.dropna(thresh=2,subset=('dropoff_longitude','dropoff_latitude'))

In [15]:
count_missings(df)

Unnamed: 0,count
fare_amount,0
pickup_longitude,0
pickup_latitude,0
dropoff_longitude,0
dropoff_latitude,0
passenger_count,0


## Extrayendo una muestra de toda la data

In [16]:
df_clean = df.dropDuplicates()

In [17]:
print('Filas: ',df.count())
print('Columnas: ',len(df.columns))

Filas:  55423480
Columnas:  7


In [18]:
print('Filas: ',df_clean.count())
print('Columnas: ',len(df_clean.columns))

Filas:  55421830
Columnas:  7


In [19]:
df_clean.describe().show()

+-------+------------------+--------------------+------------------+-----------------+------------------+------------------+------------------+
|summary|       fare_amount|     pickup_datetime|  pickup_longitude|  pickup_latitude| dropoff_longitude|  dropoff_latitude|   passenger_count|
+-------+------------------+--------------------+------------------+-----------------+------------------+------------------+------------------+
|  count|          55421830|            55421830|          55421830|         55421830|          55421830|          55421830|          55421830|
|   mean|11.345105672079614|                null|-72.51180399869492|39.92095839539886|-72.51328720212507|39.921825055217475| 1.685399345348214|
| stddev| 20.71113115840584|                null|12.843106230294701|9.640101311296613|12.776488446560032|  9.63111727923394|1.3273802146256042|
|    min|            -300.0|2009-01-01 00:00:...|      -3442.059565|     -3492.263768|      -3442.024565|      -3547.886698|            

## Creando muestra

In [20]:
muestra = df_clean.sample(withReplacement=None, fraction=0.01, seed=1)

In [21]:
muestra.describe().show()

+-------+------------------+--------------------+------------------+-----------------+------------------+------------------+-----------------+
|summary|       fare_amount|     pickup_datetime|  pickup_longitude|  pickup_latitude| dropoff_longitude|  dropoff_latitude|  passenger_count|
+-------+------------------+--------------------+------------------+-----------------+------------------+------------------+-----------------+
|  count|            553125|              553125|            553125|           553125|            553125|            553125|           553125|
|   mean|11.338296135593234|                null|-72.51648326642628|39.91164854951288| -72.5331367419359|39.939218256523176|1.686257175141243|
| stddev| 9.754911876627387|                null|13.046863641099794|10.86421756656523|13.182795790400238| 8.426033157559031|1.308609713886857|
|    min|             -45.0|2009-01-01 00:07:...|      -2972.263298|     -3401.391497|      -2633.905475|      -2166.852388|                0|

## Exportamos los datos

In [22]:
muestra.toPandas().to_csv('/content/drive/MyDrive/muestra.csv')