In [1]:
cnf = sc.getConf()\
.set('spark.cores.max', '4')\
.set('spark.max.memory', '2g')\
.set('spark.executor.memory', '2g')
sc.stop()
sc = SparkContext(conf=cnf)
spark = SparkSession(sc)

In [21]:
# import pandas as pd
%matplotlib inline
import os
from os.path import join as pjoin
from datetime import datetime as dt

import numpy as np
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import *

data_dir = '../../data'

##### some structures

In [2]:
# df = spark.read.csv(data_dir, header=True, inferSchema=True)
# df = pd.read_csv(data_path, engine='python')
# df['type'].unique()

In [3]:
class DataVO(object):

    def __init__(self, **kwargs):
        self.stock_code = kwargs['stock_code']
        self.date_time = kwargs['date_time']
        self.change = kwargs['change']
        self.price = kwargs['price']
        self.volume = kwargs['volume']
        self.amount = kwargs['amount']
        self.typ = kwargs['typ']
    
    def __str__(self):
        return str(f"stock record for stock: {self.stock_code}, at time: {self.date_time}")
    
    @staticmethod
    def get_schema() -> StructType:
        """
        Get DataFrame schema
        :return: StructType
        """
        fields = ['time', 'stock_code', 'price', 'change', 'volume', 'amount', 'type']
        field_types = [TimestampType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(),
                       DoubleType()]
        sfields = []
        for f, t in zip(fields, field_types):
            sfields.append(StructField(f, t, True))
        schema = StructType(sfields)
        return schema

def ecapsulate(line: list):
    date_time = line[0]
    price = line[1]
    change = line[2]
    volume = line[3]
    amount = line[4]
    typ = line[5]
    stock_code = line[6]
    return DataVO(stock_code=stock_code, 
                 date_time=date_time, 
                 price=price, 
                 change=change, 
                 volume=volume, 
                 amount=amount, 
                 typ=typ)

## spark

In [5]:
# import pandas as pd
import os
from os.path import join as pjoin
from datetime import datetime as dt

from pyspark.sql.types import *
from pyspark.sql.functions import *

In [6]:
data_dir = '../../data'
data_path = pjoin(data_dir, '10stock_tick_data.csv')
os.listdir(data_dir)

['10stock_5min_data.csv',
 '10stock_tick_data.csv',
 '10stock_tick_data.csv.zip',
 'test_data.csv']

In [7]:
rdd = sc.textFile(data_path)
rdd.cache()

../../data/10stock_tick_data.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [8]:
rdd.take(10)

['datetime,price,change,volume,amount,type,stock_code',
 '2019-03-29 09:25:04,10.82,-0.09,816,882912,in,000001',
 '2019-03-29 09:30:00,10.92,0.1,136,148252,in,000001',
 '2019-03-29 09:30:03,10.84,-0.08,542,588171,mid,000001',
 '2019-03-29 09:30:06,10.84,0.0,45,48782,out,000001',
 '2019-03-29 09:30:10,10.88,0.04,40,43421,in,000001',
 '2019-03-29 09:30:12,10.87,-0.01,14,15218,mid,000001',
 '2019-03-29 09:30:16,10.88,0.01,154,167455,in,000001',
 '2019-03-29 09:30:21,10.9,0.02,20,21790,in,000001',
 '2019-03-29 09:30:24,10.88,-0.02,38,41414,out,000001']

In [9]:
# rdd = sc.parallelize(['2019-03-29 09:25:04,10.82,-0.09,816,882912,in,000001\n2019-03-29 09:30:00,10.92,0.1,136,148252,in,000001'])

In [10]:
# rdd.flatMap(lambda lines: lines.split('\n')).collect()

In [11]:
lines = rdd.flatMap(lambda lines: lines.split('\n'))
field_lines = lines.map(lambda line: line.split(','))
# cast string to column schema
def cast_types(line):
    try:
        l = [dt.strptime(line[0], '%Y-%m-%d %X'), float(line[1]), float(line[2]), float(line[3]), float(line[4]), line[5], line[6]]
    except ValueError as e:
        l = [dt(2000, 1, 1, 0, 0, 0) , -1.0,  -1.0, -1.0, -1.0, '-1', '-1']
    return l
field_lines_casted = field_lines.map(cast_types)

In [12]:
# field_lines_casted.collect()
# str(ecapsulate(range(7)))
# lines.map(lambda line: line.split(',')).map(lambda rec: str(ecapsulate(rec))).collect()
# lines.map(lambda line: line.split(',')).map(lambda rec: ecapsulate(rec)).map(lambda obj: obj.price).collect()

In [13]:
# create schema
fields = ['time','price','change','volume','amount','type','stock_code']
field_types = [TimestampType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(), StringType(), StringType()]
sfields = []
for f, t in zip(fields, field_types):
    sfields.append(StructField(f, t, True))
schema = StructType(sfields)

In [14]:
# df = spark.createDataFrame(data=field_lines, schema=schema)
df = field_lines_casted.toDF(schema=schema)

In [15]:
df.show(10)

+-------------------+-----+------+------+--------+----+----------+
|               time|price|change|volume|  amount|type|stock_code|
+-------------------+-----+------+------+--------+----+----------+
|2000-01-01 00:00:00| -1.0|  -1.0|  -1.0|    -1.0|  -1|        -1|
|2019-03-29 09:25:04|10.82| -0.09| 816.0|882912.0|  in|    000001|
|2019-03-29 09:30:00|10.92|   0.1| 136.0|148252.0|  in|    000001|
|2019-03-29 09:30:03|10.84| -0.08| 542.0|588171.0| mid|    000001|
|2019-03-29 09:30:06|10.84|   0.0|  45.0| 48782.0| out|    000001|
|2019-03-29 09:30:10|10.88|  0.04|  40.0| 43421.0|  in|    000001|
|2019-03-29 09:30:12|10.87| -0.01|  14.0| 15218.0| mid|    000001|
|2019-03-29 09:30:16|10.88|  0.01| 154.0|167455.0|  in|    000001|
|2019-03-29 09:30:21| 10.9|  0.02|  20.0| 21790.0|  in|    000001|
|2019-03-29 09:30:24|10.88| -0.02|  38.0| 41414.0| out|    000001|
+-------------------+-----+------+------+--------+----+----------+
only showing top 10 rows



In [16]:
df.createOrReplaceTempView("stock")
# spark.sql('select t.stock_code, mean(t.price) as mean_price from stock t where t.volume > 150 group by t.stock_code').show()
spark.sql('select t.stock_code, mean(t.price) as mean_price from stock t where t.volume > 150 group by t.stock_code').show()

+----------+------------------+
|stock_code|        mean_price|
+----------+------------------+
|    000008|3.8767889908256885|
|    000006|  5.88334991708123|
|    000007| 8.551960326721115|
|    000011|11.003074204947003|
|    000010| 3.658470588235287|
|    000009| 4.743333333333331|
|    000001|11.003074204947003|
|    000002|26.504330808080837|
|    000005|3.0191596638655436|
+----------+------------------+



In [27]:
df.filter('volume > 150').groupby('stock_code').mean('price').withColumnRenamed('avg(price)', 'mean_price').show()

+----------+------------------+
|stock_code|        mean_price|
+----------+------------------+
|    000008|3.8767889908256885|
|    000006|  5.88334991708123|
|    000007| 8.551960326721115|
|    000011|11.003074204947003|
|    000010| 3.658470588235287|
|    000009| 4.743333333333331|
|    000001|11.003074204947003|
|    000002|26.504330808080837|
|    000005|3.0191596638655436|
+----------+------------------+



In [None]:
test = sc.parallelize(['2019-03-29 09:25:04'])
test.map(lambda x: dt.strptime('2019-03-29 09:25:04', '%Y-%m-%d %X')).collect()

## streaming

In [105]:
# import pandas as pd
import os
from os.path import join as pjoin
from pyspark.sql.types import *
from datetime import datetime as dt
from pyspark.streaming import StreamingContext

In [106]:
data_dir = '../../data'
data_path = pjoin(data_dir, '10stock_tick_data.csv')
os.listdir(data_dir)

['kmeans_data.txt',
 '10stock_5min_data.csv',
 '10stock_tick_data.csv',
 '10stock_tick_data.csv.zip',
 'streaming_kmeans_data_test.txt',
 '997stock_3day_tick_data.csv',
 'kmeans_data_test.txt',
 'test_data.csv']

In [151]:
rdd = sc.textFile(data_path)
# file_length = rdd.map(lambda x: 1).reduce(lambda a,b: a+b)

In [152]:
# split the rdd into 5 equal-size parts
rddQueue = rdd.randomSplit(range(10), seed=5003)
# Create a StreamingContext with batch interval of 5 seconds
ssc = StreamingContext(sc, 5)
# Feed the rdd queue to a DStream
rdds = ssc.queueStream(rddQueue)

In [153]:
# create schema
fields = ['time','price','change','volume','amount','type','stock_code']
field_types = [TimestampType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(), StringType(), StringType()]
sfields = []
for f, t in zip(fields, field_types):
    sfields.append(StructField(f, t, True))
schema = StructType(sfields)

# loading data
lines = rdds.flatMap(lambda lines: lines.split('\n'))
field_lines = lines.map(lambda line: line.split(','))
field_lines_casted = field_lines.map(lambda line: [dt.strptime(line[0], '%Y-%m-%d %X'), float(line[1]), float(line[2]), float(line[3]), float(line[4]), line[5], line[6]])

In [154]:
# Convert RDDs of the words DStream to DataFrame and run SQL query
def process(time, rdd):
    print("========= %s =========" % str(time))

    try:
        # Get the singleton instance of SparkSession
#         spark = getSparkSessionInstance(rdd.context.getConf())
        df = rdd.toDF(schema=schema)
        print(f"for time: {time}")
        print(f"{'-'*30}{time}{'-'*30}")
        df.createOrReplaceTempView("stock")
        ss.sql('select mean(t.price) as mean_price from stock t where t.volume > 150').show()
    except:
        pass

In [14]:
field_lines_casted.foreachRDD(process)

In [15]:
ssc.start()
ssc.awaitTermination(30)
ssc.stop(False)

for time: 2019-04-11 18:09:09
------------------------------2019-04-11 18:09:09------------------------------
for time: 2019-04-11 18:09:10
------------------------------2019-04-11 18:09:10------------------------------
for time: 2019-04-11 18:09:11
------------------------------2019-04-11 18:09:11------------------------------
for time: 2019-04-11 18:09:12
------------------------------2019-04-11 18:09:12------------------------------
for time: 2019-04-11 18:09:13
------------------------------2019-04-11 18:09:13------------------------------
for time: 2019-04-11 18:09:14
------------------------------2019-04-11 18:09:14------------------------------
for time: 2019-04-11 18:09:15
------------------------------2019-04-11 18:09:15------------------------------
for time: 2019-04-11 18:09:16
------------------------------2019-04-11 18:09:16------------------------------
for time: 2019-04-11 18:09:17
------------------------------2019-04-11 18:09:17------------------------------
for time: 

Py4JJavaError: An error occurred while calling o116.awaitTermination.
: java.lang.NullPointerException
	at org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:126)
	at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1819)
	at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$clearMetadata$3.apply(DStream.scala:458)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$clearMetadata$3.apply(DStream.scala:457)
	at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
	at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:139)
	at org.apache.spark.streaming.dstream.DStream.clearMetadata(DStream.scala:457)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$clearMetadata$5.apply(DStream.scala:470)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$clearMetadata$5.apply(DStream.scala:470)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.streaming.dstream.DStream.clearMetadata(DStream.scala:470)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$clearMetadata$2.apply(DStreamGraph.scala:134)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$clearMetadata$2.apply(DStreamGraph.scala:134)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.streaming.DStreamGraph.clearMetadata(DStreamGraph.scala:134)
	at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:263)
	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:184)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)


for time: 2019-04-11 18:09:37
------------------------------2019-04-11 18:09:37------------------------------
for time: 2019-04-11 18:09:38
------------------------------2019-04-11 18:09:38------------------------------
for time: 2019-04-11 18:09:39
------------------------------2019-04-11 18:09:39------------------------------
for time: 2019-04-11 18:09:40
------------------------------2019-04-11 18:09:40------------------------------
for time: 2019-04-11 18:09:41
------------------------------2019-04-11 18:09:41------------------------------
for time: 2019-04-11 18:09:42
------------------------------2019-04-11 18:09:42------------------------------
for time: 2019-04-11 18:09:43
------------------------------2019-04-11 18:09:43------------------------------
for time: 2019-04-11 18:09:44
------------------------------2019-04-11 18:09:44------------------------------
for time: 2019-04-11 18:09:45
------------------------------2019-04-11 18:09:45------------------------------


## clustering --> aggregation --> regression

In [3]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans
from pyspark.mllib.regression import StreamingLinearRegressionWithSGD

from pyspark.sql.functions import monotonically_increasing_id, lag
from pyspark.sql.window import Window

import os
import sys
import pandas as pd
from scipy import stats
from datetime import timedelta
from datetime import datetime as dt
from os.path import join as pjoin
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import datetime as dt
from pyspark.streaming import StreamingContext

##### create / preprocess data

sort the data by `time asc, stock_code asc` using codes below:
```
##### use pandas to do the time column refactor

def refactorTime(t):
#    formant = '%Y-%m-%d %H:%M:%S'
#    tf = dt.strptime(t, formant)
    tf = t
    sec = tf.second
    res = sec % 5
    if res >= 3:
        td = timedelta(seconds=(5 - res))
        tf += td
    else:
        td = timedelta(seconds=(res))
        tf -= td
    return tf #tf.strftime(formant)

data_pd = pd.read_csv(pjoin(data_dir, '997stock_3day_tick_daata.csv'))
data_pd = data_pd.sort_values(['time', 'stock_code'])
data_pd.reset_index(drop=True, inplace=True)
data_pd['time'] = pd.to_datetime(data_pd['time'])
data_pd['time'] = data_pd['time'].apply(refactorTime)
data_pd.to_csv(pjoin(data_dir, '997stock_3day_tick_data_sorted_refactor_time.csv'), index=False)
# data_pd.groupby(by=['time', 'stock_code'], as_index=False).agg(np.average)
```

group by `'time', 'stock_code'` using code belos

```
##### use spark to do the group by

fields = ['time','price','change','volume','amount','type','stock_code']
field_types = [TimestampType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(), DoubleType()]
sfields = []
for f, t in zip(fields, field_types):
    sfields.append(StructField(f, t, True))
schema = StructType(sfields)

data_df = spark.read.csv(pjoin(data_dir, '997stock_3day_tick_data_sorted_refactor_time.csv'), schema=schema, header=True)

data_df = data_df.groupBy(col('time'), col('stock_code'))\
.avg('price', 'change', 'volume', 'amount', 'type')\
.select(col('time'), col('stock_code'),
        bround('avg(price)', 2).alias('price'), 
        bround('avg(change)', 2).alias('change'),
        bround('avg(volume)', 2).alias('volume'),
        bround('avg(amount)', 2).alias('amount'),
        bround('avg(type)', 2).alias('type'))\
.orderBy(col('time'), col('stock_code'), ascending=[1, 1])
data_df.cache()
```

saving data by Pandas

```
##### collect data & save

data_pd = data_df.toPandas()
data_pd = data_pd.query("time>'2018-12-10 09:30:00'").reset_index(drop=True)
data_pd.to_csv(pjoin(data_dir, '997stock_3day_tick_data_sortby_time.csv'), index=False)
# data_pd.to_csv(pjoin(data_dir, '997stock_3day_tick_data_sortby_time.csv'), index=False)
```

In [23]:
data_pd = pd.read_csv(pjoin(data_dir, '997stock_3day_tick_data_sortby_time.csv'))
data_pd = data_pd.sort_values(['time', 'stock_code'])
data_pd.reset_index(drop=True, inplace=True)
data_pd['time'] = pd.to_datetime(data_pd['time'])
# data_pd['time'] = data_pd['time'].apply(refactorTime)
# data_pd.to_csv(pjoin(data_dir, '997stock_3day_tick_data_sorted_refactor_time.csv'), index=False)

In [96]:
time_idx = pd.date_range(start='2018-12-10 09:25:05', end='2018-12-10 15:00:00', freq='5s')
time_idx = time_idx.append(pd.date_range(start='2018-12-11 09:25:05', end='2018-12-11 15:00:00', freq='5s'))
time_idx_df = pd.DataFrame(time_idx, columns=['time'])
time_idx_df.set_index('time', inplace=True)
data_pd_t = data_pd.set_index('time')

In [110]:
data_pd_small = data_pd.query('stock_code < 100').reset_index(drop=True)
# data_pd_small.to_csv(pjoin(data_dir, '997stock_3day_tick_data_sortby_time_small.csv'), index=False)

In [125]:
data_pd = data_pd.query("time>'2018-12-10 09:30:00'").reset_index(drop=True)
data_pd.to_csv(pjoin(data_dir, '997stock_3day_tick_data_sortby_time.csv'), index=False)

In [127]:
data_pd.query("stock_code == 1")

Unnamed: 0,time,stock_code,price,change,volume,amount,type
0,2018-12-10 09:30:05,1.0,10.21,0.00,3229.0,3297594.5,0.0
973,2018-12-10 09:30:10,1.0,10.21,0.00,111.0,113295.0,1.0
1710,2018-12-10 09:30:15,1.0,10.21,0.00,54.0,55099.5,1.0
2380,2018-12-10 09:30:20,1.0,10.20,0.00,625.0,638000.0,0.0
3135,2018-12-10 09:30:25,1.0,10.20,0.00,81.0,82644.5,0.0
3846,2018-12-10 09:30:30,1.0,10.21,0.01,109.0,111287.0,1.0
4442,2018-12-10 09:30:35,1.0,10.20,0.00,239.0,243984.5,0.0
5207,2018-12-10 09:30:40,1.0,10.20,0.00,376.0,383506.5,0.0
5948,2018-12-10 09:30:45,1.0,10.21,0.00,58.0,59187.0,1.0
6579,2018-12-10 09:30:50,1.0,10.20,0.00,106.5,108593.5,-0.5


run command below to create a smaller version of data file to do the test

`head -100000 997stock_3day_tick_data_sortby_time.csv > 997stock_3day_tick_data_sortby_time_small.csv`

##### load data as rdd

###### rdd functions

In [7]:
from itertools import islice

def refactorTime(t):
    formant = '%Y-%m-%d %H:%M:%S'
    tf = dt.strptime(t, formant)
    return tf #tf.strftime(formant)

def flattenLine(v):
    r = []
#     r.append(v[0])
    for l in v:
        if isinstance(l, list) or isinstance(l, tuple):
            r.extend(flattenLine(l))
        else:
            r.append(l)
    return r

def split(s):
    kvs = s.split(',')
    # make seconds to 0 or 30.
    k = refactorTime(kvs[0])
    vs = [float(item) for item in kvs[1:]]
    return [k, vs]

def rddq(rdd, keys, st, ed):
    if (ed > len(keys)):
        ed = len(keys) - 1
        return rdd.filter(lambda v: v[0] >= keys[st] and v[0] <= keys[ed]).map(flattenLine)
    else:
        return rdd.filter(lambda v: v[0] >= keys[st] and v[0] < keys[ed]).map(flattenLine)
    
def rddg(rdd, keys, step):
    for i in range(0, len(keys), step):
        yield rddq(rdd, keys, i, i+step).map(flattenLine)
        

###### rdd operations

In [8]:
rdd = sc.textFile(pjoin(data_dir, '997stock_3day_tick_data_sortby_time_small.csv'))
rdd.cache()
rdd = rdd.mapPartitionsWithIndex(lambda idx, it: islice(it, 1, None) if idx == 0 else it)
rdd = rdd.map(split)

keynum = rdd.keys().distinct().count()
keys = sorted(rdd.keys().distinct().collect())

In [9]:
rddQueue = list(rddg(rdd, keys, 3))
print(rddQueue[0].collect()[-1])
print(rddQueue[1].take(1))
# print(keys)
print(len(rddQueue))

[datetime.datetime(2018, 12, 10, 9, 30, 5), 2539.0, 4.57, 0.02, 39.0, 17834.0, 0.5]
[[datetime.datetime(2018, 12, 10, 9, 30, 10), 1.0, 10.21, 0.0, 111.0, 113295.0, 1.0]]
49


###### dataFrame operations for API test

`please ignore this part`

In [391]:
rdd_test = rddQueue[0]

In [392]:
# create schema
fields = ['time','stock_code', 'price','change','volume','amount','type']
field_types = [TimestampType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(), DoubleType()]
sfields = []
for f, t in zip(fields, field_types):
    sfields.append(StructField(f, t, True))
schema = StructType(sfields)

# df = spark.createDataFrame(data=rdd, schema=schema)
df = rdd_test.toDF(schema=schema)
df.show()

+-------------------+----------+-----+------+------+---------+----+
|               time|stock_code|price|change|volume|   amount|type|
+-------------------+----------+-----+------+------+---------+----+
|2018-12-10 09:25:05|       1.0|10.22| -0.06|8905.0|9101002.0|-1.0|
|2018-12-10 09:25:05|       2.0|25.12| -0.22|2522.0|6335264.0|-1.0|
|2018-12-10 09:25:05|       5.0|  3.0| -0.02|  47.0|  14100.0|-1.0|
|2018-12-10 09:25:05|       6.0| 5.66| -0.05| 639.0| 361674.0|-1.0|
|2018-12-10 09:25:05|       7.0| 7.83| -0.17|1035.0| 810405.0| 1.0|
|2018-12-10 09:25:05|       8.0| 3.89| -0.03| 569.0| 221341.0|-1.0|
|2018-12-10 09:25:05|       9.0| 4.78|   0.0| 256.0| 122368.0|-1.0|
|2018-12-10 09:25:05|      10.0| 3.84| -0.05|1153.0| 442752.0|-1.0|
|2018-12-10 09:25:05|      11.0| 10.5| -0.08|1386.0|1455300.0|-1.0|
|2018-12-10 09:25:05|      12.0|  4.2| -0.01| 152.0|  63840.0|-1.0|
|2018-12-10 09:25:05|      14.0|10.23| -0.24|4792.0|4902216.0|-1.0|
|2018-12-10 09:25:05|      16.0| 3.65| -0.02| 23

In [393]:
# do aggregation

# df.createOrReplaceTempView("curr")
# df = spark.sql("select t.time as time, " +
# "t.stock_code as stock_code, " +
# "round(avg(t.price),2) as price, " +
# "round(avg(t.change),2) as change, " +
# "round(avg(t.volume),2) as volume, " +
# "round(avg(t.amount),2) as amount, " +
# "round(avg(t.type),2) as type " +
# "from curr t " +
# "group by t.time, t.stock_code " + 
# "order by t.stock_code asc, t.time asc")
# df.show()

df = df.groupBy(col('time'), col('stock_code'))\
.avg('price', 'change', 'volume', 'amount', 'type')\
.select(col('time'), col('stock_code'),
        bround('avg(price)', 2).alias('price'), 
        bround('avg(change)', 2).alias('change'),
        bround('avg(volume)', 2).alias('volume'),
        bround('avg(amount)', 2).alias('amount'),
        bround('avg(type)', 2).alias('type'))\
.orderBy(col('stock_code'), col('time'), ascending=[1, 1])

df.show()

+-------------------+----------+-----+------+------+---------+----+
|               time|stock_code|price|change|volume|   amount|type|
+-------------------+----------+-----+------+------+---------+----+
|2018-12-10 09:25:05|       1.0|10.22| -0.06|8905.0|9101002.0|-1.0|
|2018-12-10 09:30:05|       1.0|10.21|   0.0|3229.0|3297594.5| 0.0|
|2018-12-10 09:25:05|       2.0|25.12| -0.22|2522.0|6335264.0|-1.0|
|2018-12-10 09:30:00|       2.0|25.12|   0.0|  49.0| 123129.0|-1.0|
|2018-12-10 09:30:05|       2.0|25.16|  0.01| 379.5| 954694.0| 0.0|
|2018-12-10 09:30:05|       4.0|16.17| -0.06|   7.5|  12145.5|-1.0|
|2018-12-10 09:25:05|       5.0|  3.0| -0.02|  47.0|  14100.0|-1.0|
|2018-12-10 09:30:05|       5.0| 2.99| -0.01| 113.0|  33698.5|-1.0|
|2018-12-10 09:25:05|       6.0| 5.66| -0.05| 639.0| 361674.0|-1.0|
|2018-12-10 09:30:05|       6.0| 5.64| -0.01| 982.5| 555991.5|-1.0|
|2018-12-10 09:25:05|       7.0| 7.83| -0.17|1035.0| 810405.0| 1.0|
|2018-12-10 09:30:00|       7.0| 7.83|   0.0|  2

In [394]:
w = Window.partitionBy("stock_code").orderBy('time')#.rangeBetween(-3, 1)
lead1_Y = lead(col('price'), count=1).over(w)
df_fe = df.withColumn('lead1-Y', lead1_Y)

def append_lags(df_fe, i):
    except_set = set(['time', 'stock_code'])
    for c in df.columns:
        if c in except_set:
            continue
        df_fe = df_fe.withColumn(f"{c}-lag{i}", lag(col(c), count=i).over(w))
    return df_fe

for i in range(2):
    df_fe = append_lags(df_fe, i+1)

df_fe = df_fe.orderBy(col('stock_code'), col('time'), ascending=[1, 1])
df_fe.show()

+-------------------+----------+-----+------+------+---------+----+-------+----------+-----------+-----------+-----------+---------+----------+-----------+-----------+-----------+---------+
|               time|stock_code|price|change|volume|   amount|type|lead1-Y|price-lag1|change-lag1|volume-lag1|amount-lag1|type-lag1|price-lag2|change-lag2|volume-lag2|amount-lag2|type-lag2|
+-------------------+----------+-----+------+------+---------+----+-------+----------+-----------+-----------+-----------+---------+----------+-----------+-----------+-----------+---------+
|2018-12-10 09:25:05|       1.0|10.22| -0.06|8905.0|9101002.0|-1.0|  10.21|      null|       null|       null|       null|     null|      null|       null|       null|       null|     null|
|2018-12-10 09:30:05|       1.0|10.21|   0.0|3229.0|3297594.5| 0.0|   null|     10.22|      -0.06|     8905.0|  9101002.0|     -1.0|      null|       null|       null|       null|     null|
|2018-12-10 09:25:05|       2.0|25.12| -0.22|2522.

In [395]:
# ks = df.columns
def map_df_to_rdd(v):
    d = v.asDict()
    r = []
    for k in d:
        r.append(v[k])
    return r

# transform back to rdd
df_fe.rdd.map(map_df_to_rdd).take(5)

[[datetime.datetime(2018, 12, 10, 9, 25, 5),
  1.0,
  10.22,
  -0.06,
  8905.0,
  9101002.0,
  -1.0,
  10.21,
  None,
  None,
  None,
  None,
  None,
  None,
  None,
  None,
  None,
  None],
 [datetime.datetime(2018, 12, 10, 9, 30, 5),
  1.0,
  10.21,
  0.0,
  3229.0,
  3297594.5,
  0.0,
  None,
  10.22,
  -0.06,
  8905.0,
  9101002.0,
  -1.0,
  None,
  None,
  None,
  None,
  None],
 [datetime.datetime(2018, 12, 10, 9, 25, 5),
  2.0,
  25.12,
  -0.22,
  2522.0,
  6335264.0,
  -1.0,
  25.12,
  None,
  None,
  None,
  None,
  None,
  None,
  None,
  None,
  None,
  None],
 [datetime.datetime(2018, 12, 10, 9, 30),
  2.0,
  25.12,
  0.0,
  49.0,
  123129.0,
  -1.0,
  25.16,
  25.12,
  -0.22,
  2522.0,
  6335264.0,
  -1.0,
  None,
  None,
  None,
  None,
  None],
 [datetime.datetime(2018, 12, 10, 9, 30, 5),
  2.0,
  25.16,
  0.01,
  379.5,
  954694.0,
  0.0,
  None,
  25.12,
  0.0,
  49.0,
  123129.0,
  -1.0,
  25.12,
  -0.22,
  2522.0,
  6335264.0,
  -1.0]]

In [235]:
# r = df_fe.rdd.map(lambda v: v).take(5)
# r = r[0]
# r = r.asDict()
# # for v in r:
# #     print(v)

##### feature engineer TRANSFORMs

In [10]:
# load data as rdd
# rdd = sc.textFile(pjoin(data_dir, 'kmeans_data_test.txt'))
# rddQueue = rdd.randomSplit(np.ones(200), seed=5003)
batchInterval = 3
rddQueue = list(rddg(rdd, keys, batchInterval))
# make rdd as stream
ssc = StreamingContext(sc, batchInterval)
dataStream = ssc.queueStream(rddQueue)

In [11]:
# fe
fields = ['time','stock_code','price','change','volume','amount','type']
field_types = [TimestampType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(), DoubleType()]
sfields = []
for f, t in zip(fields, field_types):
    sfields.append(StructField(f, t, True))
schema = StructType(sfields)

def append_lags(df_fe, fields, w, i):
    except_set = set(['time', 'stock_code'])
    for c in fields:#df.columns:
        if c in except_set:
            continue
        df_fe = df_fe.withColumn(f"{c}-lag{i}", lag(col(c), count=i).over(w))
    return df_fe

def map_df_to_rdd(v):
    d = v.asDict()
    r = []
    for k in d:#fields:
        print(k)
        r.append(v[k])
    return r

def feature_builder(rdd, schema):
    # 1. aggregation
    # print((rdd.take(5)))
    df = rdd.toDF(schema=schema)    
    df = df.groupBy(col('time'), col('stock_code'))\
    .avg('price', 'change', 'volume', 'amount', 'type')\
    .select(col('time'), col('stock_code'),
            bround('avg(price)', 2).alias('price'), 
            bround('avg(change)', 2).alias('change'),
            bround('avg(volume)', 2).alias('volume'),
            bround('avg(amount)', 2).alias('amount'),
            bround('avg(type)', 2).alias('type'))\
    .orderBy(col('stock_code'), col('time'), ascending=[1, 1])
    
    # 2. shift features
    Y_COL_NAME = 'lead1-Y'
    w = Window.partitionBy("stock_code").orderBy('time')#.rangeBetween(3, 1)
    lead1_Y = lead(col('price'), count=1).over(w)
    df_fe = df.withColumn(Y_COL_NAME, lead1_Y)
    for i in range(batchInterval-1): #2): # shiftting time span, better to be the batchInterval-1 
        df_fe = append_lags(df_fe, schema.names, w, i+1)
    df_fe = df_fe.orderBy(col('stock_code'), col('time'), ascending=[1, 1])
    
    # 3. move lable column Y (lead1-Y) to the last column
    cols = df_fe.columns
    cols.remove(Y_COL_NAME)
    cols.append(Y_COL_NAME)
    df_fe = df_fe.select(cols)
    # df_fe.show()

    # 4. transback to rdd
    rdd_out = df_fe.rdd.map(map_df_to_rdd)
        
    # 5. drop rows with None feature values after shift. Except for lead-Y, which is the last col of the rdd
    rdd_out = rdd_out.filter(lambda v: None not in v[:-1])
    
    # 6. drop time column, which is the first column.
#     rdd_out = rdd_out.map(lambda v: v[1:])
    
    return rdd_out

# using slide window to ensure the new coming data have enough predecessor to shift.
# window width is twice as the batchInterval where each interval slide only 1 time of the batchInterval
dataStream = dataStream.window(batchInterval*2, batchInterval)
dataStream = dataStream.transform(lambda rdd: feature_builder(rdd, schema))
dataStream = dataStream.cache()
# after transform shiftting features, drop the old useless data
# dataStream = dataStream.window(batchInterval)
# dataStream.pprint()

##### split train/pred data stream for KMEANS

In [12]:
# we make an input stream of vectors for training,
# as well as a stream of vectors for testing
def cluster_parse_pred(lp):
    feature_start_idx = 2
    # stock id; the 0th is time
    label = lp[1] # float(lp[lp.find('(') + 1: lp.find(')')])
    # features
    vec = Vectors.dense(lp[feature_start_idx:]) # Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
    return LabeledPoint(label, vec)

def cluster_parse_train(lp):
    feature_start_idx = 2
    # only features for clustering. here include the training Y but filterd None before call this function
    vec = Vectors.dense(lp[feature_start_idx:]) # Vectors.dense([float(x) for x in lp[lp.find('[') + 1: lp.find(']')].split(',')])
    return vec

In [13]:
# separate to 2 streams
nonone_datastream = dataStream.filter(lambda v: None not in v)
trainS = nonone_datastream.map(cluster_parse_train)
predS = nonone_datastream.map(cluster_parse_pred).map(lambda lp: (lp.label, lp.features))

# trainS.pprint()
# predS.pprint()

##### clustering

1. **dim** may need to change due the different feature Engineering**
2. since 1 label could have **multiple** lines of data (different time), may result in same label belong to different clusters

In [14]:
# ssc.checkpoint('cp')
# initial streaming clustering model
# here dim may change due the different feature Engineering; 
# for now is features except (time, stock_code) then shift (batchInterval-1) times combine with current value with another one future price.
dim = (len(schema.names) - 2)*batchInterval + 1
numClusters = 2
model = StreamingKMeans(k=numClusters, decayFactor=1.0).setRandomCenters(dim, 1.0, 5003) # dim weight seed

In [15]:
# train & pred on streams get result: dstream ==> (label, clusterID)
model.trainOn(trainS)
result = model.predictOnValues(predS)
# the result will be like same label have multiple clusters, which each row belongs to 1 timepoint.
# therefore, requring a groupby to eliminate the duplicated labels (stock_code). 
# using mode as cluster for the stock; require scipy.stats
result = result.groupByKey().mapValues(lambda v: float(stats.mode(list(v)).mode[0]))
# result.pprint()
# result.foreachRDD(lambda rd: print(rd.lookup(1)))

##### aggregation by cls & time

In [16]:
# swap time & stock_code, then make stock_code as key
kstream = dataStream.map(lambda v: [v[1], v[0], v[1:]]).map(flattenLine).map(lambda v: (v[0], v[1:]))
# kstream.pprint()
# join the datapoin with label
# (stock_code, (cluster_id, [time, stock_code, features:list, label]))
jstream = result.join(kstream)
# [cluster_id, time, stock_code, features:list, label], remove extra stock_code
jstream = jstream.map(flattenLine).map(lambda v: v[1:])
# jstream.pprint()

rfields = ['cluster_id', 'time', 'stock_code', 
          'price', 'change', 'volume', 'amount', 'type', 
          'price-lag1', 'change-lag1', 'volume-lag1', 'amount-lag1', 'type-lag1', 
          'price-lag2', 'change-lag2', 'volume-lag2', 'amount-lag2', 'type-lag2',
          'lead1-Y']
rfield_types = [DoubleType(), TimestampType(), DoubleType(), 
               DoubleType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(),
               DoubleType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(), 
               DoubleType(), DoubleType(), DoubleType(), DoubleType(), DoubleType(), 
               DoubleType()]
rFields = []
for f, t in zip(rfields, rfield_types):
    rFields.append(StructField(f, t, True))
schema_agg = StructType(rFields)

# aggregation & feature engineering for Regression model.
def agg_fe_by_clusters_time(rdd, schema):
    try:
        df = rdd.toDF(schema=schema)
        df = df.groupBy(col('cluster_id'), col('time'))\
        .avg('price', 'change', 'volume', 'amount', 'type', 
             'price-lag1', 'change-lag1', 'volume-lag1', 'amount-lag1', 'type-lag1',
             'price-lag2', 'change-lag2', 'volume-lag2', 'amount-lag2', 'type-lag2',
             'lead1-Y')\
        .select(col('cluster_id'), col('time'),
                bround('avg(price)', 2).alias('price'), 
                bround('avg(change)', 2).alias('change'),
                bround('avg(volume)', 2).alias('volume'),
                bround('avg(amount)', 2).alias('amount'),
                bround('avg(type)', 2).alias('type'),
                bround('avg(price-lag1)', 2).alias('price-lag1'), 
                bround('avg(change-lag1)', 2).alias('change-lag1'),
                bround('avg(volume-lag1)', 2).alias('volume-lag1'),
                bround('avg(amount-lag1)', 2).alias('amount-lag1'),
                bround('avg(type-lag1)', 2).alias('type-lag1'),
                bround('avg(price-lag2)', 2).alias('price-lag2'), 
                bround('avg(change-lag2)', 2).alias('change-lag2'),
                bround('avg(volume-lag2)', 2).alias('volume-lag2'),
                bround('avg(amount-lag2)', 2).alias('amount-lag2'),
                bround('avg(type-lag2)', 2).alias('type-lag2'),
                bround('avg(lead1-Y)', 2).alias('lead1-Y'))\
        .orderBy(col('cluster_id'), col('time'), ascending=[1, 1])
        # df.show()
        rdd_out = df.rdd.map(map_df_to_rdd)
        return rdd_out
    except Exception as e:
        print(e)
        return rdd

rdatastream = jstream.transform(lambda rdd: agg_fe_by_clusters_time(rdd, schema_agg))
rdatastream = rdatastream.cache()
# rdatastream.pprint()

##### split data for clusters. stream branches

In [17]:
# (y, vec) parser
def rparse(lp):
    y = -1 if lp[-1] is None else lp[-1]
    vec = Vectors.dense(lp[:-1])
    return LabeledPoint(y, vec)

# must use a function to divide the branches of stream 
# to avoid using the the global varibale `cls` accidently
def branch(gstream, cls):
    # 0: cluster_id; 1: time; 2:-1 -> features; -1: Y(label)
    return gstream.filter(lambda v: v[0] == cls).map(lambda v: rparse(v[2:]))
    
    
# separate steamdata for different cluster
cls_streams = dict()
for cls in range(numClusters):
    clsstream = branch(rdatastream, cls)
    cls_streams[cls] = clsstream
    clsstream.cache()
#     clsstream.pprint()

##### regression

In [18]:
# split train/pred data stream for REGRESSION
def rparse_pred(stream):
    # none label row is the row to predict
    return stream.filter(lambda v: v.label == -1).map(lambda v: (v.label, v.features))

def rparse_train(stream):
    # rows with labels are the rows to train
    return stream.filter(lambda v: v.label != -1)#.map(lambda v: (v.label, v.features))

# features exclude colume 'cluster_id', 'time', 'stock_code', 'lead1-Y'
numFeatures = len(schema_agg.names) - 4
def make_reg_model(stream, numFeatures):
    # inital streaming linear regression model
    mdl = StreamingLinearRegressionWithSGD();
    mdl.setInitialWeights(np.zeros(numFeatures).tolist())
    mdl.trainOn(stream)
    return mdl

cls_train_stream = dict()
cls_pred_stream = dict()
cls_reg_models = dict()
cls_reg_prediction = dict()
for cls in range(numClusters):
    cls_train_stream[cls] = rparse_train(cls_streams[cls])
    cls_pred_stream[cls] = rparse_pred(cls_streams[cls])
    
    cls_train_stream[cls].pprint()
    cls_pred_stream[cls].pprint()
    
    # reg model
    cls_reg_models[cls] = make_reg_model(cls_train_stream[cls], numFeatures)
    # cls_reg_models[cls].predictOn(cls_pred_stream[cls].map(lambda lp: lp.features))
    cls_reg_prediction[cls] = cls_reg_models[cls].predictOnValues(cls_pred_stream[cls])
    # valstream
    cls_reg_prediction[cls].pprint()

##### encapsulate result

In [None]:
def encapsulate_result(self, cls, real_stream, pred_val_stream):
    real_stream.foreachRDD(lambda rdd: self.encapsulate_real(cls, rdd))
    # find time to predict -> (label=-1, time)
    pred_time_stream = real_stream.filter(lambda v: v[1].label == -1).map(lambda v: (v[1].label, dt.strftime(v[0], '%Y-%m-%d %H:%M:%S')))
    pred_stream = pred_time_stream.join(pred_val_stream)
    pred_stream.pprint()
    # pred_stream.foreachRDD(lambda rdd: self.encapsulate_pred(cls, rdd))

def encapsulate_real(self, cls, rdd):
    res_dic = self.run_result#.result_dict
    cls_dic = res_dic.get(cls, dict())
    # (time_dt, (label=real_price, features)) -> (time, real_price)
    data = rdd.map(lambda v: (dt.strftime(v[0], '%Y-%m-%d %H:%M:%S'), v[1].label)).collect()
    for time, val in data:
        data_dic = cls_dic.get(time: dict())
        data_dic.update({'real':val})
        cls_dic.update({time, data_dic})
    res_dic.update({cls: cls_dic})
    print(f"{res_dic}")

##### run

In [19]:
try:    
    ssc.start()
    # ssc.stop(=True, stopGraceFully=True)
    ssc.awaitTermination(50)
    ssc.stop(stopSparkContext=False)
except Exception as e:
    print(e)
    ssc.stop(stopSparkContext=False)


# mdl = model.latestModel()
# print(f"centers: {mdl.clusterCenters}")

-------------------------------------------
Time: 2019-05-04 20:42:21
-------------------------------------------

-------------------------------------------
Time: 2019-05-04 20:42:21
-------------------------------------------

-------------------------------------------
Time: 2019-05-04 20:42:21
-------------------------------------------

-------------------------------------------
Time: 2019-05-04 20:42:21
-------------------------------------------

-------------------------------------------
Time: 2019-05-04 20:42:21
-------------------------------------------

-------------------------------------------
Time: 2019-05-04 20:42:21
-------------------------------------------

-------------------------------------------
Time: 2019-05-04 20:42:24
-------------------------------------------

-------------------------------------------
Time: 2019-05-04 20:42:24
-------------------------------------------

-------------------------------------------
Time: 2019-05-04 20:42:24
----------

#### other tests

In [None]:
mdl.centers

In [None]:
from pyspark.mllib.feature import Normalizer
v = Vectors.dense(range(3))
nor = Normalizer(1)
nor.transform(v)

In [None]:
rdd = sc.parallelize([v])
nor.transform(rdd).collect()
nor2 = Normalizer(float("inf"))
nor2.transform(v)

In [19]:
v

DenseVector([0.0, 1.0, 2.0])

In [28]:
np.array(list(map(lambda v: v.array, [v, v])))

array([[0., 1., 2.],
       [0., 1., 2.]])

In [33]:
np.sum(np.array(list(map(lambda v: v.array, [v]))), axis=0)

array([0., 1., 2.])

In [89]:
# def call(*vals):
#     for v in vals:
#         print(v)
# call(*schema.fields)
# np.ones(2).astype('int').tolist()

def call2(v7, **kwargs):
    print('*'*10)
    print(v7)
    for p in kwargs:
        print(f"{p}:{kwargs[p]}")

def call(v1, v2=2, v3=3, **kwargs):
    print(v1)
    print(v2)
    print(v3)
    for p in kwargs:
        print(f"{p}:{kwargs[p]}")
    
    call2(**kwargs)
        
pp = {'v7': 929, 'v8': 100}
call(12, v5=55, v3=3, v2=4, v6=66, **pp)

12
4
3
v5:55
v6:66
v7:929
v8:100
**********
929
v5:55
v6:66
v8:100
