In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.types import IntegerType, DateType, StringType, StructType, MapType
import pyspark.sql.functions as psf
from pyspark.sql.window import Window
from pyspark.sql.functions import col, desc
from env import get_env


def readData(spark, file_name):
    print("****** File Name : ", file_name)
    read_df = spark \
        .readStream \
        .option("inferSchema", "true") \
        .option("maxFilesPerTrigger", 1) \
        .option("multiline", "true") \
        .json(f"{file_name}" + "_*")

    return read_df


def writeData(input_df, checkpoint_path, table_name):
    write_df = input_df \
        .writeStream \
        .format("parquet") \
        .outputMode("append") \
        .option("checkpointLocation", f"{checkpoint_path}") \
        .toTable(f"{table_name}")

    #         .option("path", "/Users/navneetsingh/Downloads/lemonade/output/event/") \
    #         .format("console") \

    return write_df


def load_vehicles_event(spark, file_name, checkpoint_path):
    vehicle_df = readData(spark, file_name)

    vehicle_df = vehicle_df.select(psf.explode('vehicles_events').alias('tmp')).select(
        'tmp.*')  # .withColumn('event_extra_data', )

    vehicle_df.printSchema()

    vehicle_df = writeData(vehicle_df, checkpoint_path, table_name='vehicle_event')

    return


def load_vehicles_status(spark, file_name, checkpoint_path):
    vehicle_df = readData(spark, file_name)

    vehicle_df = vehicle_df.select(psf.explode('vehicle_status').alias('tmp')).select('tmp.*')

    vehicle_df.printSchema()

    vehicle_df = writeData(vehicle_df, checkpoint_path, table_name='vehicle_status')

    return

    '''
    return vehicle_df \
        .writeStream \
        .format("JSON") \
        .option("path", "/Users/navneetsingh/Downloads/lemonade/output/status/") \
        .outputMode("append") \
        .option("checkpointLocation", f"{checkpoint_path}") \
        .trigger(processingTime='7 seconds') \
        .start()
    '''


if __name__ == "__main__":
    env = get_env()

    batch_home = env['BATCH_HOME']
    events_file = batch_home + '/' + env['EVENTS_FILENAME']
    status_file = batch_home + '/' + env['STATUS_FILENAME']
    events_checkpoint = env['EVENTS_CHECKPOINT']
    status_checkpoint = env['STATUS_CHECKPOINT']

    spark = SparkSession \
        .builder \
        .master("local[*]") \
        .appName("lemonade") \
        .config("spark.hadoop.hive.exec.dynamic.partition", "true") \
        .config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict") \
        .config("spark.sql.streaming.schemaInference", "true") \
        .enableHiveSupport() \
        .getOrCreate()

    spark.sparkContext.setLogLevel('ERROR')

    # spark.sql("set spark.sql.streaming.schemaInference=true")

    load_vehicles_event(spark, events_file, events_checkpoint)

    load_vehicles_status(spark, status_file, status_checkpoint)

    spark.streams.awaitAnyTermination()
    
            #.config("spark.sql.warehouse.dir",
            #    f"{batch_home}" + "/code/spark-warehouse/") \


****** File Name :  /home/jovyan/work//input/vehicles_events
root
 |-- event_extra_data:: struct (nullable = true)
 |    |-- boot_time: long (nullable = true)
 |    |-- emergency_call: boolean (nullable = true)
 |    |-- note: string (nullable = true)
 |-- event_source: string (nullable = true)
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_value: string (nullable = true)
 |-- vehicle_id: string (nullable = true)



Py4JJavaError: An error occurred while calling o105.toTable.
: java.io.IOException: mkdir of file:/Users/navneetsingh/Downloads/lemonade/code/spark-warehouse/vehicle_event/_spark_metadata failed
	at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1357)
	at org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:185)
	at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:219)
	at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:809)
	at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:805)
	at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
	at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:812)
	at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.mkdirs(CheckpointFileManager.scala:320)
	at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.<init>(HDFSMetadataLog.scala:64)
	at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.<init>(CompactibleFileStreamLog.scala:48)
	at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.<init>(FileStreamSinkLog.scala:86)
	at org.apache.spark.sql.execution.streaming.FileStreamSink.<init>(FileStreamSink.scala:139)
	at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:326)
	at org.apache.spark.sql.streaming.DataStreamWriter.createV1Sink(DataStreamWriter.scala:432)
	at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:399)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:248)
	at org.apache.spark.sql.streaming.DataStreamWriter.writeToV1Table$1(DataStreamWriter.scala:319)
	at org.apache.spark.sql.streaming.DataStreamWriter.toTable(DataStreamWriter.scala:329)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [5]:
!pwd

/home


In [3]:
import os

In [9]:
BATCH_HOME = os.chdir("..")

In [25]:
print(type(BATCH_HOME))

<class 'str'>


In [10]:
os.chdir("..")

In [13]:
!pwd

/home/jovyan


In [16]:
os.chdir("/home/jovyan/work")

In [21]:
BATCH_HOME = os.getcwd()