In [1]:
cnf = sc.getConf()\
.set('spark.cores.max', '2')\
.set('spark.max.memory', '1g')\
.set('spark.executor.memory', '1g')
sc.stop()
sc = SparkContext(conf=cnf)
spark = SparkSession(sc)

In [2]:
# import pandas as pd
import os
from os.path import join as pjoin
from datetime import datetime as dt

from pyspark.sql.types import *
from pyspark.sql.functions import *

In [3]:
# df = spark.read.csv(data_dir, header=True, inferSchema=True)
# df = pd.read_csv(data_path, engine='python')
# df['type'].unique()

In [4]:
class DataVO(object):

    def __init__(self, **kwargs):
        self.stock_code = kwargs['stock_code']
        self.date_time = kwargs['date_time']
        self.change = kwargs['change']
        self.price = kwargs['price']
        self.volume = kwargs['volume']
        self.amount = kwargs['amount']
        self.typ = kwargs['typ']
    
    def __str__(self):
        return str(f"stock record for stock: {self.stock_code}, at time: {self.date_time}")

def ecapsulate(line: list):
    date_time = line[0]
    price = line[1]
    change = line[2]
    volume = line[3]
    amount = line[4]
    typ = line[5]
    stock_code = line[6]
    return DataVO(stock_code=stock_code, 
                 date_time=date_time, 
                 price=price, 
                 change=change, 
                 volume=volume, 
                 amount=amount, 
                 typ=typ)

## spark

In [5]:
# import pandas as pd
import os
from os.path import join as pjoin
from datetime import datetime as dt

from pyspark.sql.types import *
from pyspark.sql.functions import *

In [6]:
data_dir = '../../data'
data_path = pjoin(data_dir, '10stock_tick_data.csv')
os.listdir(data_dir)

['10stock_5min_data.csv',
 '10stock_tick_data.csv',
 '10stock_tick_data.csv.zip',
 'test_data.csv']

In [7]:
rdd = sc.textFile(data_path)
rdd.cache()

../../data/10stock_tick_data.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [8]:
rdd.take(10)

['datetime,price,change,volume,amount,type,stock_code',
 '2019-03-29 09:25:04,10.82,-0.09,816,882912,in,000001',
 '2019-03-29 09:30:00,10.92,0.1,136,148252,in,000001',
 '2019-03-29 09:30:03,10.84,-0.08,542,588171,mid,000001',
 '2019-03-29 09:30:06,10.84,0.0,45,48782,out,000001',
 '2019-03-29 09:30:10,10.88,0.04,40,43421,in,000001',
 '2019-03-29 09:30:12,10.87,-0.01,14,15218,mid,000001',
 '2019-03-29 09:30:16,10.88,0.01,154,167455,in,000001',
 '2019-03-29 09:30:21,10.9,0.02,20,21790,in,000001',
 '2019-03-29 09:30:24,10.88,-0.02,38,41414,out,000001']

In [9]:
# rdd = sc.parallelize(['2019-03-29 09:25:04,10.82,-0.09,816,882912,in,000001\n2019-03-29 09:30:00,10.92,0.1,136,148252,in,000001'])

In [10]:
# rdd.flatMap(lambda lines: lines.split('\n')).collect()

In [11]:
lines = rdd.flatMap(lambda lines: lines.split('\n'))
field_lines = lines.map(lambda line: line.split(','))
# cast string to column schema
def cast_types(line):
    try:
        l = [dt.strptime(line[0], '%Y-%m-%d %X'), float(line[1]), float(line[2]), float(line[3]), float(line[4]), line[5], line[6]]
    except ValueError as e:
        l = [dt(2000, 1, 1, 0, 0, 0) , -1.0,  -1.0, -1.0, -1.0, '-1', '-1']
    return l
field_lines_casted = field_lines.map(cast_types)

In [12]:
# field_lines_casted.collect()
# str(ecapsulate(range(7)))
# lines.map(lambda line: line.split(',')).map(lambda rec: str(ecapsulate(rec))).collect()
# lines.map(lambda line: line.split(',')).map(lambda rec: ecapsulate(rec)).map(lambda obj: obj.price).collect()

In [13]:
# create schema
fields = ['time','price','change','volume','amount','type','stock_code']
field_types = [TimestampType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(), StringType(), StringType()]
sfields = []
for f, t in zip(fields, field_types):
    sfields.append(StructField(f, t, True))
schema = StructType(sfields)

In [14]:
# df = spark.createDataFrame(data=field_lines, schema=schema)
df = field_lines_casted.toDF(schema=schema)

In [15]:
df.show(10)

+-------------------+-----+------+------+--------+----+----------+
|               time|price|change|volume|  amount|type|stock_code|
+-------------------+-----+------+------+--------+----+----------+
|2000-01-01 00:00:00| -1.0|  -1.0|  -1.0|    -1.0|  -1|        -1|
|2019-03-29 09:25:04|10.82| -0.09| 816.0|882912.0|  in|    000001|
|2019-03-29 09:30:00|10.92|   0.1| 136.0|148252.0|  in|    000001|
|2019-03-29 09:30:03|10.84| -0.08| 542.0|588171.0| mid|    000001|
|2019-03-29 09:30:06|10.84|   0.0|  45.0| 48782.0| out|    000001|
|2019-03-29 09:30:10|10.88|  0.04|  40.0| 43421.0|  in|    000001|
|2019-03-29 09:30:12|10.87| -0.01|  14.0| 15218.0| mid|    000001|
|2019-03-29 09:30:16|10.88|  0.01| 154.0|167455.0|  in|    000001|
|2019-03-29 09:30:21| 10.9|  0.02|  20.0| 21790.0|  in|    000001|
|2019-03-29 09:30:24|10.88| -0.02|  38.0| 41414.0| out|    000001|
+-------------------+-----+------+------+--------+----+----------+
only showing top 10 rows



In [16]:
df.createOrReplaceTempView("stock")
# spark.sql('select t.stock_code, mean(t.price) as mean_price from stock t where t.volume > 150 group by t.stock_code').show()
spark.sql('select t.stock_code, mean(t.price) as mean_price from stock t where t.volume > 150 group by t.stock_code').show()

+----------+------------------+
|stock_code|        mean_price|
+----------+------------------+
|    000008|3.8767889908256885|
|    000006|  5.88334991708123|
|    000007| 8.551960326721115|
|    000011|11.003074204947003|
|    000010| 3.658470588235287|
|    000009| 4.743333333333331|
|    000001|11.003074204947003|
|    000002|26.504330808080837|
|    000005|3.0191596638655436|
+----------+------------------+



In [27]:
df.filter('volume > 150').groupby('stock_code').mean('price').withColumnRenamed('avg(price)', 'mean_price').show()

+----------+------------------+
|stock_code|        mean_price|
+----------+------------------+
|    000008|3.8767889908256885|
|    000006|  5.88334991708123|
|    000007| 8.551960326721115|
|    000011|11.003074204947003|
|    000010| 3.658470588235287|
|    000009| 4.743333333333331|
|    000001|11.003074204947003|
|    000002|26.504330808080837|
|    000005|3.0191596638655436|
+----------+------------------+



In [None]:
test = sc.parallelize(['2019-03-29 09:25:04'])
test.map(lambda x: dt.strptime('2019-03-29 09:25:04', '%Y-%m-%d %X')).collect()

## streaming

In [8]:
# import pandas as pd
import os
from os.path import join as pjoin
from pyspark.sql.types import *
from datetime import datetime as dt

In [18]:
data_dir = '../../data'
data_path = pjoin(data_dir, '10stock_tick_data.csv')
os.listdir(data_dir)

['10stock_5min_data.csv',
 '10stock_tick_data.csv',
 '10stock_tick_data.csv.zip',
 'test_data.csv']

In [19]:
from pyspark.streaming import StreamingContext

In [22]:
rdd = sc.textFile(data_path)
# file_length = rdd.map(lambda x: 1).reduce(lambda a,b: a+b)

In [None]:
# split the rdd into 5 equal-size parts
rddQueue = rdd.randomSplit(range(10), seed=5003)
# Create a StreamingContext with batch interval of 5 seconds
ssc = StreamingContext(sc, 5)
# Feed the rdd queue to a DStream
rdds = ssc.queueStream(rddQueue)

In [8]:
# create schema
fields = ['time','price','change','volume','amount','type','stock_code']
field_types = [TimestampType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(), StringType(), StringType()]
sfields = []
for f, t in zip(fields, field_types):
    sfields.append(StructField(f, t, True))
schema = StructType(sfields)

# loading data
lines = rdds.flatMap(lambda lines: lines.split('\n'))
field_lines = lines.map(lambda line: line.split(','))
field_lines_casted = field_lines.map(lambda line: [dt.strptime(line[0], '%Y-%m-%d %X'), float(line[1]), float(line[2]), float(line[3]), float(line[4]), line[5], line[6]])

In [9]:
# Convert RDDs of the words DStream to DataFrame and run SQL query
def process(time, rdd):
    print("========= %s =========" % str(time))

    try:
        # Get the singleton instance of SparkSession
#         spark = getSparkSessionInstance(rdd.context.getConf())
        df = rdd.toDF(schema=schema)
        print(f"for time: {time}")
        print(f"{'-'*30}{time}{'-'*30}")
        df.createOrReplaceTempView("stock")
        ss.sql('select mean(t.price) as mean_price from stock t where t.volume > 150').show()
    except:
        pass

In [14]:
field_lines_casted.foreachRDD(process)

In [15]:
ssc.start()
ssc.awaitTermination(30)
ssc.stop(False)

for time: 2019-04-11 18:09:09
------------------------------2019-04-11 18:09:09------------------------------
for time: 2019-04-11 18:09:10
------------------------------2019-04-11 18:09:10------------------------------
for time: 2019-04-11 18:09:11
------------------------------2019-04-11 18:09:11------------------------------
for time: 2019-04-11 18:09:12
------------------------------2019-04-11 18:09:12------------------------------
for time: 2019-04-11 18:09:13
------------------------------2019-04-11 18:09:13------------------------------
for time: 2019-04-11 18:09:14
------------------------------2019-04-11 18:09:14------------------------------
for time: 2019-04-11 18:09:15
------------------------------2019-04-11 18:09:15------------------------------
for time: 2019-04-11 18:09:16
------------------------------2019-04-11 18:09:16------------------------------
for time: 2019-04-11 18:09:17
------------------------------2019-04-11 18:09:17------------------------------
for time: 

Py4JJavaError: An error occurred while calling o116.awaitTermination.
: java.lang.NullPointerException
	at org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:126)
	at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1819)
	at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$clearMetadata$3.apply(DStream.scala:458)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$clearMetadata$3.apply(DStream.scala:457)
	at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
	at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:139)
	at org.apache.spark.streaming.dstream.DStream.clearMetadata(DStream.scala:457)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$clearMetadata$5.apply(DStream.scala:470)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$clearMetadata$5.apply(DStream.scala:470)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.streaming.dstream.DStream.clearMetadata(DStream.scala:470)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$clearMetadata$2.apply(DStreamGraph.scala:134)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$clearMetadata$2.apply(DStreamGraph.scala:134)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.streaming.DStreamGraph.clearMetadata(DStreamGraph.scala:134)
	at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:263)
	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:184)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)


for time: 2019-04-11 18:09:37
------------------------------2019-04-11 18:09:37------------------------------
for time: 2019-04-11 18:09:38
------------------------------2019-04-11 18:09:38------------------------------
for time: 2019-04-11 18:09:39
------------------------------2019-04-11 18:09:39------------------------------
for time: 2019-04-11 18:09:40
------------------------------2019-04-11 18:09:40------------------------------
for time: 2019-04-11 18:09:41
------------------------------2019-04-11 18:09:41------------------------------
for time: 2019-04-11 18:09:42
------------------------------2019-04-11 18:09:42------------------------------
for time: 2019-04-11 18:09:43
------------------------------2019-04-11 18:09:43------------------------------
for time: 2019-04-11 18:09:44
------------------------------2019-04-11 18:09:44------------------------------
for time: 2019-04-11 18:09:45
------------------------------2019-04-11 18:09:45------------------------------
