In [3]:
import pyspark
import os
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.functions import when, month, udf,monotonically_increasing_id

In [4]:
credentials_location = '/home/rohit/.gc/de-cred.json'

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.executor.memory","8g") \
    .set("spark.driver.memory","10g") \
    .set("spark.jars", "./lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [5]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

23/04/04 17:06:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [6]:
spark = SparkSession.builder \
        .config(conf=sc.getConf()) \
        .getOrCreate()
                   

In [8]:
year = 2022 
fformat = 'raw'
fmon="jan"
month_mapper= {1:"jan",2:"feb",3:"mar",4:"apr",5:"may",6:"june",7:"july",8:"aug",9:"sept",10:"oct",11:"nov",12:"dec"}


In [9]:
fpath = f"gs://fhvhv-project/{year}/{fformat}/{fmon}/*"
df = spark.read.parquet(fpath)
df.printSchema()

                                                                                

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp (nullable = true)
 |-- on_scene_datetime: timestamp (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_ride_flag: string (nul

In [10]:
month_dict= {1:"January",2:"Febuary",3:"March",4:"April",5:"May",6:"June",7:"July",8:"August",9:"September",10:"October",11:"November",12:"December"}
func = udf(lambda x: month_dict[x])


In [11]:
for i in range(1,13):
    fmon = month_mapper[i]
    fpath = f"gs://fhvhv-project/{year}/{fformat}/{fmon}/*"
    print(fpath)
    df = spark.read.parquet(fpath)
    df = df.repartition(18)
    columns = df.columns
    updated_df = df.withColumn('pickup_date',F.to_date(df['pickup_datetime'])) \
            .withColumn('dropoff_date', F.to_date(df['dropoff_datetime']))\
            .withColumn('service_name', when(df['hvfhs_license_num']=='HV0003', "Uber").when(df['hvfhs_license_num']=='HV0005',"Lyft").otherwise("Others"))\
            .withColumn("trip_month", F.lit(month_dict[i]))\
            .drop('pickup_datetime')\
            .drop('dropoff_datetime')\
            .na.fill(value=0,subset=["airport_fee","tips","trip_miles","trip_time","tolls","base_passenger_fare","driver_pay","bcf"])\
            .na.fill(value='N',subset=["access_a_ride_flag","wav_match_flag","wav_request_flag","shared_match_flag","shared_request_flag"])
    
    print(f"dataframe updated! --{fmon}")
    updated_df.coalesce(10).write \
        .mode('overwrite').format('parquet').save(f'{output_data}/trips_table/{fmon}')


gs://fhvhv-project/2022/raw/jan/*



[Stage 1:>                                                          (0 + 1) / 1]

                                                                                

dataframe updated! --jan


23/04/04 17:10:52 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

gs://fhvhv-project/2022/raw/feb/*


                                                                                

dataframe updated! --feb


                                                                                

gs://fhvhv-project/2022/raw/mar/*
dataframe updated! --mar


                                                                                

gs://fhvhv-project/2022/raw/apr/*
dataframe updated! --apr


                                                                                

gs://fhvhv-project/2022/raw/may/*
dataframe updated! --may


                                                                                

gs://fhvhv-project/2022/raw/june/*
dataframe updated! --june


                                                                                

gs://fhvhv-project/2022/raw/july/*
dataframe updated! --july


                                                                                

gs://fhvhv-project/2022/raw/aug/*
dataframe updated! --aug


                                                                                

gs://fhvhv-project/2022/raw/sept/*
dataframe updated! --sept


                                                                                

gs://fhvhv-project/2022/raw/oct/*



[Stage 37:>                                                         (0 + 1) / 1]

                                                                                

dataframe updated! --oct


                                                                                

gs://fhvhv-project/2022/raw/nov/*



[Stage 41:>                                                         (0 + 1) / 1]

                                                                                

dataframe updated! --nov


                                                                                

gs://fhvhv-project/2022/raw/dec/*
dataframe updated! --dec


                                                                                

In [9]:
updated_df = df.withColumn('pickup_date',F.to_date(df['pickup_datetime'])) \
            .withColumn('dropoff_date', F.to_date(df['dropoff_datetime']))\
            .withColumn('service_name', when(df['hvfhs_license_num']=='HV0003', "Uber").when(df['hvfhs_license_num']=='HV0005',"Lyft").otherwise("Others"))\
            .withColumn("trip_month", func(month(df['pickup_datetime'])))\
            .withColumn("unique_id",monotonically_increasing_id())\
            .drop('pickup_datetime')\
            .drop('dropoff_datetime')\
            .na.fill(value=0,subset=["airport_fee","tips","trip_miles","trip_time","tolls","base_passenger_fare","driver_pay","bcf"])\
            .na.fill(value='N',subset=["access_a_ride_flag","wav_match_flag","wav_request_flag","shared_match_flag","shared_request_flag"])

In [16]:
new_df.show(5)

23/04/02 17:39:29 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/04/02 17:40:11 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:40:11 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.




23/04/02 17:40:11 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:40:11 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.




23/04/02 17:40:11 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:40:12 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:40:12 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:40:12 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.




23/04/02 17:40:13 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:40:13 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:40:13 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:40:13 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:40:59 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:40:59 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:43:57 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.




23/04/02 17:43:57 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:43:57 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:43:58 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:43:58 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:43:58 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.




23/04/02 17:43:58 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:43:59 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:43:59 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.




23/04/02 17:43:59 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:43:59 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.




23/04/02 17:43:59 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:44:00 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:44:00 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:44:13 WARN TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.




23/04/02 17:44:18 WARN TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
23/04/02 17:44:18 WARN TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.



23/04/02 17:47:58 WARN TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
23/04/02 17:47:58 WARN TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
23/04/02 17:47:59 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:47:59 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:48:00 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:48:00 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:48:00 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:48:00 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:48:01 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:48:01 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/04/02 17:48:01 WARN TaskMem



23/04/02 17:48:18 WARN TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+-----------+------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|pickup_date|dropoff_date|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+


                                                                                

In [9]:
updated_df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp (nullable = true)
 |-- on_scene_datetime: timestamp (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- trip_miles: double (nullable = false)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = false)
 |-- tolls: double (nullable = false)
 |-- bcf: double (nullable = false)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = false)
 |-- tips: double (nullable = false)
 |-- driver_pay: double (nullable = false)
 |-- shared_request_flag: string (nullable = false)
 |-- shared_match_flag: string (nullable = false)
 |-- access_a_ride_flag: string (nullable = false)
 |-- wav_request_flag: string (nullable = false)
 |-- wav_match_flag: string 

In [43]:
updated_df.groupBy('service_name','trip_month').count().show()



+------------+----------+--------+
|service_name|trip_month|   count|
+------------+----------+--------+
|        Uber|     March|13136268|
|        Lyft|   Febuary| 4578385|
|        Lyft|  December| 5657939|
|        Uber|      July|12575713|
|        Uber|       May|13325434|
|        Uber|   October|14102892|
|        Lyft|   October| 5203198|
|        Uber|  November|12968005|
|        Uber|      June|13049858|
|        Lyft|      June| 4730217|
|        Lyft|       May| 4831901|
|        Uber|     April|13010980|
|        Lyft|     April| 4741581|
|        Lyft|  November| 5117891|
|        Uber|   Febuary|11440898|
|        Uber|    August|12500703|
|        Uber| September|12902315|
|        Lyft|   January| 3925255|
|        Lyft|     March| 5317280|
|        Uber|   January|10826336|
+------------+----------+--------+
only showing top 20 rows





                                                                                

In [14]:
updated_df.count()

                                                                                

212416083

In [None]:
size = 212416083
chunksize = 1000000

In [10]:
columns = updated_df.columns
# updated_df = updated_df.coalesce(20)
# trip_table = updated_df.select(columns)

output_data = "gs://fhvhv-project/output_data"

updated_df.coalesce(20).write.partitionBy('trip_month')\
.mode('overwrite').format('parquet').save(f'{output_data}/trips_table')
                             

23/04/03 18:08:55 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/04/03 18:19:22 ERROR TaskMemoryManager: error while calling spill() on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@251419d8
java.io.IOException: No space left on device
	at java.base/java.io.FileOutputStream.writeBytes(Native Method)
	at java.base/java.io.FileOutputStream.write(FileOutputStream.java:354)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:59)
	at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
	at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:223)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:176)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskB

23/04/03 18:19:22 WARN TaskSetManager: Lost task 33.0 in stage 1.0 (TID 34) (data-eng-vm.asia-south2-a.c.data-engineering-rj.internal executor driver): TaskKilled (Stage cancelled)
23/04/03 18:19:22 WARN TaskSetManager: Lost task 27.0 in stage 1.0 (TID 28) (data-eng-vm.asia-south2-a.c.data-engineering-rj.internal executor driver): TaskKilled (Stage cancelled)
23/04/03 18:19:22 WARN TaskSetManager: Lost task 30.0 in stage 1.0 (TID 31) (data-eng-vm.asia-south2-a.c.data-engineering-rj.internal executor driver): TaskKilled (Stage cancelled)


Py4JJavaError: An error occurred while calling o274.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 24 in stage 1.0 failed 1 times, most recent failure: Lost task 24.0 in stage 1.0 (TID 25) (data-eng-vm.asia-south2-a.c.data-engineering-rj.internal executor driver): org.apache.spark.memory.SparkOutOfMemoryError: error while calling spill() on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@251419d8 : No space left on device
	at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:230)
	at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:297)
	at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:95)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:391)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:447)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:485)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$10(ShuffleExchangeExec.scala:376)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.memory.SparkOutOfMemoryError: error while calling spill() on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@251419d8 : No space left on device
	at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:230)
	at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:297)
	at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:95)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:391)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:447)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:485)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$10(ShuffleExchangeExec.scala:376)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)


23/04/03 18:19:24 WARN TaskSetManager: Lost task 34.0 in stage 1.0 (TID 35) (data-eng-vm.asia-south2-a.c.data-engineering-rj.internal executor driver): TaskKilled (Stage cancelled)


In [11]:
spark

In [13]:
df -h/

SyntaxError: invalid syntax (4039927159.py, line 1)