In [8]:
import findspark
findspark.init('C://Spark')

In [9]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
        .master('local[*]') \
            .appName('Streaming from File') \
                .getOrCreate()
spark

In [10]:
from pyspark.sql.types import (StructType, StructField,
 StringType, IntegerType, DoubleType)
input_csv_schema = StructType([
    StructField("registration_dttm", StringType(), True),
    StructField("id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("ip_address", StringType(), True),
    StructField("cc", StringType(), True),
    StructField("country", StringType(), True),
    StructField("birthdate", StringType(), True),
    StructField("salary", DoubleType(), True),
    StructField("title", StringType(), True),
    StructField("comments", StringType(), True)
])

In [5]:
stream_df = spark \
    .readStream \
        .format('csv') \
            .schema(input_csv_schema) \
                .option('header', True) \
                .load(path="./input_data/csv")

stream_df.printSchema()

root
 |-- registration_dttm: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- cc: string (nullable = true)
 |-- country: string (nullable = true)
 |-- birthdate: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- title: string (nullable = true)
 |-- comments: string (nullable = true)



In [6]:
# write_df = stream_df \
#     .writeStream \
#         .format('console').outputMode('update').trigger(processingTime='2 seconds')\
#             .start()

write_df = stream_df \
 .writeStream \
 .format("csv") \
 .option("format", "append") \
 .option("path", "./destination_path/") \
 .option("checkpointLocation", "./checkpoint_path") \
 .outputMode("append") \
 .start()

In [11]:
spark.stop()

In [25]:
stream_df = spark \
    .readStream \
        .format('json') \
            .schema(input_csv_schema) \
                .option('header', True) \
                .load(path="./input_data/json")

stream_df.printSchema()

root
 |-- registration_dttm: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- cc: string (nullable = true)
 |-- country: string (nullable = true)
 |-- birthdate: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- title: string (nullable = true)
 |-- comments: string (nullable = true)



In [12]:
import findspark
findspark.init('C://Spark')

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

spark = SparkSession.builder.master('local[*]').appName('File Streaming JSON').getOrCreate()

input_csv_schema = StructType([
    StructField("registration_dttm", StringType(), True),
    StructField("id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("ip_address", StringType(), True),
    StructField("cc", StringType(), True),
    StructField("country", StringType(), True),
    StructField("birthdate", StringType(), True),
    StructField("salary", DoubleType(), True),
    StructField("title", StringType(), True),
    StructField("comments", StringType(), True)
])


stream_df = spark.readStream.format('json').schema(input_csv_schema).option('header', True)\
    .load(path='input_data/json')


df1 = stream_df.groupBy('country').count().orderBy('count', ascending=False)

write_df = stream_df \
 .writeStream \
 .format("json") \
 .option("path", "./destination_path/") \
 .option("checkpointLocation", "./checkpoint_path") \
 .start()

In [46]:
spark.stop()