# Init

In [112]:
import findspark
findspark.init()
findspark.find()

import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext, StreamingListener
from pyspark.sql import SparkSession
from pyspark.sql.types import Row, StructField, StringType, IntegerType, FloatType, DoubleType, StructType
from pyspark.sql.functions import udf, from_json

In [65]:
conf = pyspark.SparkConf()\
            .setAppName('appName')\
            .setMaster('local[*]')\
            .set("spark.executor.memory", "2g")\
            .set("spark.cores.max", "4")\
            .set("spark.sql.shuffle.partitions", "4")
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
sc

# Make dataset

In [12]:
import csv
import numpy as np
import pandas as pd

from io import StringIO, BytesIO

In [5]:
job_data = pd.DataFrame({
    'A': [0.1,0.2,0.3333333333333333333333333333333333333333],
    'B': ['A', 'B', 'C']
}, columns=['A', 'B']).to_csv(index=False, sep='|', line_terminator='\t', float_format='%g')

df = pd.DataFrame({
    'JobID': ['00000001'], 
    'JobData': [job_data]
}, columns=['JobID', 'JobData'])

df.to_csv('./Data/Job_00000001.csv', header=False, index=False, line_terminator='\n', quoting=csv.QUOTE_NONNUMERIC)

In [38]:
row_df = pd.read_csv('./Data/Job_00000001.csv', header=None, names=['JobID', 'JobData'], dtype=str)
job_data_df = pd.read_csv(StringIO(row_df.loc[0, 'JobData']), index_col=False, sep='|', lineterminator='\t')

In [121]:
job_data_df.sum()#.to_dict(orient='records')

A    0.633333
B         ABC
dtype: object

# DStreams (Base on RDD)

In [154]:
ssc = StreamingContext(sc, 5)

In [155]:
def func(row):
    
    # De-serialization
    row_df = pd.read_csv(StringIO(row), header=None, names=['JobID', 'JobData'], dtype=str)
    job_data_df = pd.read_csv(StringIO(row_df.loc[0, 'JobData']), index_col=False, sep='|', lineterminator='\t')
    
    # Do something...
    result_dict = {
        'JobID': row_df.loc[0, 'JobID'],
        'JobResult': 'AAA'#job_data_df.to_dict(orient='records')
    }
    
    return result_dict
    
result_rdd = ssc.textFileStream('./Data').map(func).cache()

In [156]:
schema = StructType([
    StructField('JobID', StringType(), nullable=True),
    StructField('JobResult', StringType(), nullable=True)
])

def save_process(time, rdd):
    count = rdd.count()
    print("[{:s}] {:d}".format(str(time), count))
    if count > 0:
        print(rdd.take(5))
        result_sdf = rdd.toDF(schema).cache()
        result_sdf.show()
    
result_rdd.foreachRDD(save_process)

In [157]:
ssc.start()
is_terminated = ssc.awaitTerminationOrTimeout(10)

[2020-03-01 21:26:40] 0
[2020-03-01 21:26:45] 1
[{'JobID': '00000001', 'JobResult': 'AAA'}]
+--------+---------+
|   JobID|JobResult|
+--------+---------+
|00000001|      AAA|
+--------+---------+

[2020-03-01 21:26:50] 0
[2020-03-01 21:26:55] 0
[2020-03-01 21:27:00] 0
[2020-03-01 21:27:05] 0
[2020-03-01 21:27:10] 0
[2020-03-01 21:27:15] 0
[2020-03-01 21:27:20] 0
[2020-03-01 21:27:25] 0
[2020-03-01 21:27:30] 0
[2020-03-01 21:27:35] 0
[2020-03-01 21:27:40] 0
[2020-03-01 21:27:45] 0
[2020-03-01 21:27:50] 4
[{'JobID': '00000001', 'JobResult': 'AAA'}, {'JobID': '00000001', 'JobResult': 'AAA'}, {'JobID': '00000001', 'JobResult': 'AAA'}, {'JobID': '00000001', 'JobResult': 'AAA'}]
+--------+---------+
|   JobID|JobResult|
+--------+---------+
|00000001|      AAA|
|00000001|      AAA|
|00000001|      AAA|
|00000001|      AAA|
+--------+---------+

[2020-03-01 21:27:55] 0
[2020-03-01 21:28:00] 0
[2020-03-01 21:28:05] 0
[2020-03-01 21:28:10] 0
[2020-03-01 21:28:15] 0
[2020-03-01 21:28:20] 0
[202

In [158]:
ssc.stop(stopSparkContext=False)

In [100]:
#ssc.awaitTerminationOrTimeout(1)

True

# Structured Streaming (Base on SparkDataFrame, SparkSQL)

## Json

In [8]:
sdf = spark.readStream.text('./Data')

In [23]:
@udf
def to_upper(s):
    
    import time
    import pandas as pd
    
    print('-----------')
    print(s)
    
    time.sleep(2)
    
    return (s, s.upper())

query = sdf.select(to_upper("value")).writeStream \
    .format("console") \
    .start()

In [30]:
sdf.groupby("value").max()

DataFrame[value: string]

In [17]:
schema = StructType([
    StructField('A', IntegerType(), nullable=True),
    StructField('B', StringType(), nullable=True)
])

In [24]:
parsed_data = sdf.selectExpr("CAST(value as string) as json")\
    .select(from_json("json", schema).alias("data"))\
    .select("data.*")

In [23]:
parsed_data.writeStream\
        .option("checkpointLocation", "./checkpoint")\
        .format('console')\
        .start()

<pyspark.sql.streaming.StreamingQuery at 0x1d9060a7508>

In [16]:
# query = sdf.writeStream \
#     .format("console") \
#     .start()

In [9]:
sdf.isStreaming

True

In [25]:
spark.stop()

## CSV

In [8]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [9]:
# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)

In [10]:
schema = StructType([
    StructField('JobID', StringType(), nullable=True),
    StructField('JobData', StringType(), nullable=True)
])

sdf = spark.readStream.csv('./Data', header=True, schema=schema)

In [20]:
@pandas_udf("JobID string, JobData string", PandasUDFType.GROUPED_MAP)
def func(pdf):
    #result_pdf = pdf.assign(JobResult=pdf.JobData).drop(['JobData'], axis=1)
    return pdf

query = sdf.groupby("JobID").apply(func).writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

In [159]:
# sdf.rdd.map(lambda x: x).writeStream \
#     .outputMode("append") \
#     .format("console") \
#     .start()

In [160]:
# sdf.groupBy("JobID").max().writeStream \
#     .format("console") \
#     .start()

# Wait

In [69]:
import asyncio, nest_asyncio
nest_asyncio.apply() # Fix asyncio bug in IPython (RuntimeError: This event loop is already running)

In [80]:
async def waiter(event):
    print('waiting for it ...')
    await event.wait()
    print('... got it!')

async def main():
    # Create an Event object.
    event = asyncio.Event()

    # Spawn a Task to wait until 'event' is set.
    waiter_task = asyncio.create_task(waiter(event))

    # Sleep for 1 second and set the event.
    await asyncio.sleep(1)
    event.set()

    # Wait until the waiter task is finished.
    await waiter_task

asyncio.run(main())

waiting for it ...
... got it!
