In [62]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [63]:
import pyspark
from pyspark.sql import SparkSession

In [64]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()


In [65]:
df = spark.read \
    .option("header", "true") \
    .csv('fhv_tripdata_2021-01.csv')

In [66]:
df.head(5)

[Row(dispatching_base_num='B00009', pickup_datetime='2021-01-01 00:27:00', dropoff_datetime='2021-01-01 00:44:00', PULocationID=None, DOLocationID=None, SR_Flag=None, Affiliated_base_number='B00009'),
 Row(dispatching_base_num='B00009', pickup_datetime='2021-01-01 00:50:00', dropoff_datetime='2021-01-01 01:07:00', PULocationID=None, DOLocationID=None, SR_Flag=None, Affiliated_base_number='B00009'),
 Row(dispatching_base_num='B00013', pickup_datetime='2021-01-01 00:01:00', dropoff_datetime='2021-01-01 01:51:00', PULocationID=None, DOLocationID=None, SR_Flag=None, Affiliated_base_number='B00013'),
 Row(dispatching_base_num='B00037', pickup_datetime='2021-01-01 00:13:09', dropoff_datetime='2021-01-01 00:21:26', PULocationID=None, DOLocationID='72', SR_Flag=None, Affiliated_base_number='B00037'),
 Row(dispatching_base_num='B00037', pickup_datetime='2021-01-01 00:38:31', dropoff_datetime='2021-01-01 00:53:44', PULocationID=None, DOLocationID='61', SR_Flag=None, Affiliated_base_number='B0003

In [67]:
from pyspark.sql import types

In [68]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [69]:
##Provide a schema to our csv file

df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2021-01.csv')

In [70]:
df.head(5)

[Row(hvfhs_license_num='B00009', dispatching_base_num='2021-01-01 00:27:00', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 44), dropoff_datetime=None, PULocationID=None, DOLocationID=None, SR_Flag='B00009'),
 Row(hvfhs_license_num='B00009', dispatching_base_num='2021-01-01 00:50:00', pickup_datetime=datetime.datetime(2021, 1, 1, 1, 7), dropoff_datetime=None, PULocationID=None, DOLocationID=None, SR_Flag='B00009'),
 Row(hvfhs_license_num='B00013', dispatching_base_num='2021-01-01 00:01:00', pickup_datetime=datetime.datetime(2021, 1, 1, 1, 51), dropoff_datetime=None, PULocationID=None, DOLocationID=None, SR_Flag='B00013'),
 Row(hvfhs_license_num='B00037', dispatching_base_num='2021-01-01 00:13:09', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 21, 26), dropoff_datetime=None, PULocationID=72, DOLocationID=None, SR_Flag='B00037'),
 Row(hvfhs_license_num='B00037', dispatching_base_num='2021-01-01 00:38:31', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 53, 44), dropoff_datetime=Non

In [71]:
## Break a file into chunks of it (partitions)

df = df.repartition(24)

In [72]:
df.write    

<pyspark.sql.readwriter.DataFrameWriter at 0x22d851e11c0>

In [73]:
df = spark.read.parquet('fhvhv/2021/01/')

In [74]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



SELECT * FROM df WHERE hvfhs_license_num = B02437

In [75]:
df.select('*').filter(df.hvfhs_license_num =='B02437') \
.show()

+-----------------+--------------------+-------------------+----------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+----------------+------------+------------+-------+
|           B02437| 2021-01-04 18:59:00|2021-01-04 19:01:03|            null|         247|        null| B02437|
|           B02437| 2021-01-04 09:45:34|2021-01-04 09:52:31|            null|         247|        null| B02437|
|           B02437| 2021-01-01 08:37:11|2021-01-01 08:57:15|            null|         265|        null| B02437|
|           B02437| 2021-01-03 19:45:57|2021-01-03 19:54:06|            null|         247|        null| B02437|
|           B02437| 2021-01-01 12:06:43|2021-01-01 12:14:07|            null|          78|        null| B02437|
|           B02437| 2021-01-05 11:21:51|2021-01-05 11:25:10|            null|          20|        null| 

### Transformations are considered "lazy" and they do not executed inmediately, on the other hand Actions (eager) do.

In [76]:
from pyspark.sql import functions as F

In [77]:
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

In [78]:
crazy_stuff('B02884')

's/b44'

In [79]:
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [81]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .select('base_id','pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

Py4JJavaError: An error occurred while calling o479.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 41.0 failed 1 times, most recent failure: Lost task 0.0 in stage 41.0 (TID 85) (LAPTOP-7431CGMI executor driver): java.io.IOException: Cannot run program "python3": CreateProcess error=2, El sistema no puede encontrar el archivo especificado
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:167)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:157)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:81)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: CreateProcess error=2, El sistema no puede encontrar el archivo especificado
	at java.base/java.lang.ProcessImpl.create(Native Method)
	at java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:487)
	at java.base/java.lang.ProcessImpl.start(ProcessImpl.java:154)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1107)
	... 26 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:506)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
	at jdk.internal.reflect.GeneratedMethodAccessor111.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: Cannot run program "python3": CreateProcess error=2, El sistema no puede encontrar el archivo especificado
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:167)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:157)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:81)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: java.io.IOException: CreateProcess error=2, El sistema no puede encontrar el archivo especificado
	at java.base/java.lang.ProcessImpl.create(Native Method)
	at java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:487)
	at java.base/java.lang.ProcessImpl.start(ProcessImpl.java:154)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1107)
	... 26 more


In [82]:
df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID') \
  .filter(df.hvfhs_license_num == 'HV0003')

DataFrame[pickup_datetime: timestamp, dropoff_datetime: timestamp, PULocationID: int, DOLocationID: int]