# Pasos para el setting up de Spark en local
1. Ensure that Java is installed; otherwise install Java.
2. Download the latest version of Apache Spark from
https://spark.apache.org/downloads.html.
Extrae el tgz y tar hasta que quede solo la carpeta con los archivos de spark
3. Extract the files from the zipped folder.
4. Copy all the Spark-related files to their respective directory.
ademas si estas en windows debes anañir a la carpeta ej: C:\spark\spark-2.4.4-bin-hadoop2.7\bin winutils.exe para la version de hadoop
5. Configure the environment variables to be able to run Spark. Variables del entorno globales
`SPARK_HOME  = C:\spark\spark-2.4.4-bin-hadoop2.7
HADOOP_HOME = C:\spark\spark-2.4.4-bin-hadoop2.7`
6. Verify the installation and run Spark. con los siguientes chunks

In [1]:
# comando para encontrar la distribucion 
import findspark

findspark.init()
findspark.find()


'C:\\spark\\spark-2.4.4-bin-hadoop2.7'

In [2]:
# Estos dos chunks de código son para la comprobar la configuración del spark en el sistema windows. Si no sale error es que todo ha salido bien. No son necesarios para funcionar el modulo pyspark
import pyspark
findspark.find()

'C:\\spark\\spark-2.4.4-bin-hadoop2.7'

## CARGA DE PYSPARK

In [3]:
import pandas as pd
import numpy as np

In [4]:
# librerias necesarias para la sesión de spark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('appName').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
#    pyspark.sql.SparkSession es el punto de entrada principal para la funcionalidad DataFrame y SQL. 
#    PARA TERMINAR LA SESION --> con sc.stop()


In [5]:
# Carga de librerias para pyspark FUNCIONES SQL y TIPOS DE DATOS
import pyspark.sql.functions as F
from pyspark.sql.types import *
from datetime import datetime

In [6]:
# Funcion para convertir fecha a timestamp con pandas_udf (calculo por vectores en lugar de linea a linea)
#  MapType, ArrayType of TimestampType, and nested StructType. BinaryType is supported only when PyArrow is 0.10.0 or above.

def date_convert(date_str): # atención en pandas udf no va bien. salta un error... revisar
    date_format = '%m/%d/%Y %H:%M:%S'
    try:
        dt=pd.to_datetime(date_str,errors='raise',format=date_format)
    except ValueError as v:
        if len(v.args) > 0 and v.args[0].startswith('unconverted data remains: '):
            dt = dt[:-(len(v.args[0])-26)]
            dt=datetime.strptime(dt,date_format)
        else:
            raise v
    return dt
# La siguiente funcion da una salida de Timestamp a la funcion date_convert en spark
date_convert_pudf = F.pandas_udf(date_convert, TimestampType())

In [7]:
# Funcion para convertir fecha a timestamp
def date_convert(date_str):
    date_format = '%m/%d/%Y %H:%M:%S'
    try:
        dt=pd.to_datetime(date_str,errors='raise',format=date_format)
    except ValueError as v:
        if len(v.args) > 0 and v.args[0].startswith('unconverted data remains: '):
            dt = dt[:-(len(v.args[0])-26)]
            dt=datetime.strptime(dt,date_format)
        else:
            raise v
    return dt
# La siguiente funcion da una salida de Timestamp a la funcion date_convert en spark
date_convert_udf = F.udf(date_convert, TimestampType())

In [8]:
#  dt,ht = date_str.split()
dt , ht ='04/01/2014 00:11:00'.split()

In [9]:
pd.to_datetime('4/1/2014 0:11:00', format= '%m/%d/%Y %H:%M:%S')

Timestamp('2014-04-01 00:11:00')

In [10]:
date_convert('04/01/2014 00:11:00')

Timestamp('2014-04-01 00:11:00')

In [11]:
# leer un csv

uberpickabr14 = spark.read.csv(
    'file:/c:/users/lucatic/documents/datasets/uber_pickups14/uber-raw-data-apr14.csv',
    inferSchema=True,header=True
)
uberpickabr14.show()

+----------------+-------+--------+------+
|       Date/Time|    Lat|     Lon|  Base|
+----------------+-------+--------+------+
|4/1/2014 0:11:00| 40.769|-73.9549|B02512|
|4/1/2014 0:17:00|40.7267|-74.0345|B02512|
|4/1/2014 0:21:00|40.7316|-73.9873|B02512|
|4/1/2014 0:28:00|40.7588|-73.9776|B02512|
|4/1/2014 0:33:00|40.7594|-73.9722|B02512|
|4/1/2014 0:33:00|40.7383|-74.0403|B02512|
|4/1/2014 0:39:00|40.7223|-73.9887|B02512|
|4/1/2014 0:45:00| 40.762| -73.979|B02512|
|4/1/2014 0:55:00|40.7524| -73.996|B02512|
|4/1/2014 1:01:00|40.7575|-73.9846|B02512|
|4/1/2014 1:19:00|40.7256|-73.9869|B02512|
|4/1/2014 1:48:00|40.7591|-73.9684|B02512|
|4/1/2014 1:49:00|40.7271|-73.9803|B02512|
|4/1/2014 2:11:00|40.6463|-73.7896|B02512|
|4/1/2014 2:25:00|40.7564|-73.9167|B02512|
|4/1/2014 2:31:00|40.7666|-73.9531|B02512|
|4/1/2014 2:43:00| 40.758|-73.9761|B02512|
|4/1/2014 3:22:00|40.7238|-73.9821|B02512|
|4/1/2014 3:35:00|40.7531|-74.0039|B02512|
|4/1/2014 3:35:00|40.7389|-74.0393|B02512|
+----------

In [12]:
# Ver la estructura del dataset
uberpickabr14.printSchema()

root
 |-- Date/Time: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Lon: double (nullable = true)
 |-- Base: string (nullable = true)



In [13]:
uberpickabr14.count()

564516

In [14]:
uberpickabr14.summary().show()

+-------+----------------+-------------------+-------------------+------+
|summary|       Date/Time|                Lat|                Lon|  Base|
+-------+----------------+-------------------+-------------------+------+
|  count|          564516|             564516|             564516|564516|
|   mean|            null|  40.74000520746832| -73.97681683903477|  null|
| stddev|            null|0.03608320502016508|0.05042582837278078|  null|
|    min|4/1/2014 0:00:00|            40.0729|           -74.7733|B02512|
|    25%|            null|            40.7225|           -73.9977|  null|
|    50%|            null|            40.7425|           -73.9847|  null|
|    75%|            null|            40.7607|             -73.97|  null|
|    max|4/9/2014 9:59:00|            42.1166|           -72.0666|B02764|
+-------+----------------+-------------------+-------------------+------+



In [15]:
## Funciones definidas por usuario en pandas y que puedes ejecutar con spark
# Declaramos la orden


# date_convert_pudf() 

In [16]:
uberpickabr14.select(date_convert_udf(uberpickabr14['Date/Time'])).show()

+-----------------------+
|date_convert(Date/Time)|
+-----------------------+
|    2014-04-01 00:11:00|
|    2014-04-01 00:17:00|
|    2014-04-01 00:21:00|
|    2014-04-01 00:28:00|
|    2014-04-01 00:33:00|
|    2014-04-01 00:33:00|
|    2014-04-01 00:39:00|
|    2014-04-01 00:45:00|
|    2014-04-01 00:55:00|
|    2014-04-01 01:01:00|
|    2014-04-01 01:19:00|
|    2014-04-01 01:48:00|
|    2014-04-01 01:49:00|
|    2014-04-01 02:11:00|
|    2014-04-01 02:25:00|
|    2014-04-01 02:31:00|
|    2014-04-01 02:43:00|
|    2014-04-01 03:22:00|
|    2014-04-01 03:35:00|
|    2014-04-01 03:35:00|
+-----------------------+
only showing top 20 rows



In [17]:
#seleccionamos y convertimos la columna
uberpickabr14.select(uberpickabr14['Date/Time'],uberpickabr14.Base,
    date_convert_udf(uberpickabr14['Date/Time']).alias('datetime')
).show(6)

+----------------+------+-------------------+
|       Date/Time|  Base|           datetime|
+----------------+------+-------------------+
|4/1/2014 0:11:00|B02512|2014-04-01 00:11:00|
|4/1/2014 0:17:00|B02512|2014-04-01 00:17:00|
|4/1/2014 0:21:00|B02512|2014-04-01 00:21:00|
|4/1/2014 0:28:00|B02512|2014-04-01 00:28:00|
|4/1/2014 0:33:00|B02512|2014-04-01 00:33:00|
|4/1/2014 0:33:00|B02512|2014-04-01 00:33:00|
+----------------+------+-------------------+
only showing top 6 rows



In [18]:
'file:/C:/Users/Lucatic/Documents/Datasets/UBER_PICKUPS14/uber-raw-data-apr14.csv'.lower()

'file:/c:/users/lucatic/documents/datasets/uber_pickups14/uber-raw-data-apr14.csv'

In [19]:
## CAMBIAR DE NOMBRE LAS COLUMNAS
UBERPICKS = uberpickabr14.select(
    uberpickabr14['Date/Time'],
    uberpickabr14.Base,
    uberpickabr14.Lat,
    uberpickabr14.Lon,
    date_convert_udf(uberpickabr14['Date/Time']).alias('datetime') #Alias para renombrar col
)
UBERPICKS.show(5)

+----------------+------+-------+--------+-------------------+
|       Date/Time|  Base|    Lat|     Lon|           datetime|
+----------------+------+-------+--------+-------------------+
|4/1/2014 0:11:00|B02512| 40.769|-73.9549|2014-04-01 00:11:00|
|4/1/2014 0:17:00|B02512|40.7267|-74.0345|2014-04-01 00:17:00|
|4/1/2014 0:21:00|B02512|40.7316|-73.9873|2014-04-01 00:21:00|
|4/1/2014 0:28:00|B02512|40.7588|-73.9776|2014-04-01 00:28:00|
|4/1/2014 0:33:00|B02512|40.7594|-73.9722|2014-04-01 00:33:00|
+----------------+------+-------+--------+-------------------+
only showing top 5 rows



In [55]:
## lo de arriba pero más sencillo
## CAMBIAR DE NOMBRE LAS COLUMNAS y modificarlas.
uberpickabr14.withColumn( # es como un mutate
    'datetime',date_convert_udf(uberpickabr14['Date/Time']))\
.withColumnRenamed("Date/Time","FechaStr")\
.show(5)

+----------------+-------+--------+------+-------------------+
|        FechaStr|    Lat|     Lon|  Base|           datetime|
+----------------+-------+--------+------+-------------------+
|4/1/2014 0:11:00| 40.769|-73.9549|B02512|2014-04-01 00:11:00|
|4/1/2014 0:17:00|40.7267|-74.0345|B02512|2014-04-01 00:17:00|
|4/1/2014 0:21:00|40.7316|-73.9873|B02512|2014-04-01 00:21:00|
|4/1/2014 0:28:00|40.7588|-73.9776|B02512|2014-04-01 00:28:00|
|4/1/2014 0:33:00|40.7594|-73.9722|B02512|2014-04-01 00:33:00|
+----------------+-------+--------+------+-------------------+
only showing top 5 rows



In [53]:
# Funcion para devolver el día de la semana. 
def dayweek(col):
    return col.strftime('%A')
dayweek_udf = F.udf(dayweek, StringType()) #UDF es mas lenta porque lo hace linea a linea, pandas_udf lo verctoriza, pero ... no va bien, hay que programarlo de manera mas complicada. 

In [54]:
UBERPICKS = UBERPICKS\
.withColumn('FECHA',F.date_format('datetime','dd-MM' ))\
.withColumn('D',dayweek_udf('datetime'))

UBERPICKS.show(5)

+----------------+------+-------+--------+-------------------+-----+-------+
|       Date/Time|  Base|    Lat|     Lon|           datetime|FECHA|      D|
+----------------+------+-------+--------+-------------------+-----+-------+
|4/1/2014 0:11:00|B02512| 40.769|-73.9549|2014-04-01 00:11:00|01-04|Tuesday|
|4/1/2014 0:17:00|B02512|40.7267|-74.0345|2014-04-01 00:17:00|01-04|Tuesday|
|4/1/2014 0:21:00|B02512|40.7316|-73.9873|2014-04-01 00:21:00|01-04|Tuesday|
|4/1/2014 0:28:00|B02512|40.7588|-73.9776|2014-04-01 00:28:00|01-04|Tuesday|
|4/1/2014 0:33:00|B02512|40.7594|-73.9722|2014-04-01 00:33:00|01-04|Tuesday|
|4/1/2014 0:33:00|B02512|40.7383|-74.0403|2014-04-01 00:33:00|01-04|Tuesday|
|4/1/2014 0:39:00|B02512|40.7223|-73.9887|2014-04-01 00:39:00|01-04|Tuesday|
|4/1/2014 0:45:00|B02512| 40.762| -73.979|2014-04-01 00:45:00|01-04|Tuesday|
|4/1/2014 0:55:00|B02512|40.7524| -73.996|2014-04-01 00:55:00|01-04|Tuesday|
|4/1/2014 1:01:00|B02512|40.7575|-73.9846|2014-04-01 01:01:00|01-04|Tuesday|

## FILTROS

In [21]:
## Filtrar por fechas
UBERPICKS.where(
    UBERPICKS.datetime > datetime(2014,4,2,2,30) 
).show()

+----------------+------+-------+--------+-------------------+
|       Date/Time|  Base|    Lat|     Lon|           datetime|
+----------------+------+-------+--------+-------------------+
|4/2/2014 2:33:00|B02512|40.7383|-73.9839|2014-04-02 02:33:00|
|4/2/2014 3:18:00|B02512|40.7339|-73.9949|2014-04-02 03:18:00|
|4/2/2014 3:33:00|B02512|40.7402|-74.0059|2014-04-02 03:33:00|
|4/2/2014 3:43:00|B02512|40.7731|-73.9584|2014-04-02 03:43:00|
|4/2/2014 3:49:00|B02512|40.6461|-73.7769|2014-04-02 03:49:00|
|4/2/2014 3:56:00|B02512|40.7475|-73.9565|2014-04-02 03:56:00|
|4/2/2014 4:18:00|B02512|40.7471|-74.0023|2014-04-02 04:18:00|
|4/2/2014 4:22:00|B02512|40.7816| -73.956|2014-04-02 04:22:00|
|4/2/2014 4:25:00|B02512|40.7788|-73.9481|2014-04-02 04:25:00|
|4/2/2014 4:26:00|B02512|40.6948|-74.1779|2014-04-02 04:26:00|
|4/2/2014 4:29:00|B02512|40.7583|-73.9665|2014-04-02 04:29:00|
|4/2/2014 4:36:00|B02512|40.7258|-73.9877|2014-04-02 04:36:00|
|4/2/2014 4:40:00|B02512|40.7655|-73.9932|2014-04-02 04

In [22]:
# Filtro múltiple de dias
UBERPICKS\
.filter(
    UBERPICKS.datetime <= datetime(2014,4,2,5))\
.filter(
    UBERPICKS.datetime > datetime(2014,4,1) ).show(5)

+----------------+------+-------+--------+-------------------+
|       Date/Time|  Base|    Lat|     Lon|           datetime|
+----------------+------+-------+--------+-------------------+
|4/1/2014 0:11:00|B02512| 40.769|-73.9549|2014-04-01 00:11:00|
|4/1/2014 0:17:00|B02512|40.7267|-74.0345|2014-04-01 00:17:00|
|4/1/2014 0:21:00|B02512|40.7316|-73.9873|2014-04-01 00:21:00|
|4/1/2014 0:28:00|B02512|40.7588|-73.9776|2014-04-01 00:28:00|
|4/1/2014 0:33:00|B02512|40.7594|-73.9722|2014-04-01 00:33:00|
+----------------+------+-------+--------+-------------------+
only showing top 5 rows



In [None]:
UBERPICKS.

In [None]:
@udf

### No nos olvidemos de acabar con la sesión spark!!

In [None]:
sc.stop()

In [None]:
#Prueba sinple de que Spark funciona. Calculo numero PI.
import random

num_samples = 100000000

def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

count = sc.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4 * count / num_samples

print(pi)

sc.stop()