In [1]:
from IPython.display import display
import math
import pandas as pd
import numpy as np
import matplotlib as mpl
import datetime
import datetime as dt
import matplotlib.pyplot as plt
import seaborn as sns
import pdb

from sklearn.linear_model import LinearRegression, Lasso, LassoCV
from sklearn.metrics import mean_squared_error, mean_absolute_error, explained_variance_score,r2_score, log_loss, make_scorer
from sklearn.model_selection import train_test_split, cross_val_score, KFold
from sklearn.preprocessing import OneHotEncoder
from scipy.stats import probplot as qqplot
from sklearn.svm.libsvm import predict_proba # need for the log_loss metric
import warnings
from scipy.stats import poisson
import lizec
import os
import pickle 
pd.options.display.float_format = '{:.5f}'.format
pd.set_option('display.max_columns', 500)
plt.style.use('classic')
warnings.filterwarnings("ignore", category=DeprecationWarning)
pd.options.mode.chained_assignment = None
print(plt.rcParams.get('figure.figsize'))
plt.rcParams['figure.figsize'] = (6, 4)
#%matplotlib inline  


[8.0, 6.0]


In [5]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Window
from pyspark.sql.types import *
from pyspark.sql.functions import avg, count, col, abs, max as spark_max, min as spark_min
from pyspark.sql.functions import sum as spark_sum, unix_timestamp, udf
import pyspark.sql.functions as psf


In [8]:
try:
    conf = SparkConf().setMaster("local").setAppName("My App2").setSystemProperty('spark.executor.memory', '50mb')
    sc = SparkContext(conf = conf)
    sqlContext = SQLContext(sc)
except:
    pass

# Paralelization

In [5]:
# Spark: cluster computing platform. Spark-core is the heart: a computing engine for
# scheduling, distributing etc

# jupyter already loads an SparkContext instance sc, like in pyspark shell


# driver program: this notebook, shell or standaline app. Driver programs define RDDs
# and actions on them. Driver programms access spark (spark core) through a SC object
# Driver programs manages nodes/executers and deploy tasks to them.,

# Create RDD from file. RDDs are immutable collections of objects. They are split into partitions
# which may be computed on different nodes of the cluster
#conf = SparkConf().setMaster("local").setAppName("My App2")
#sc = SparkContext(conf = conf)


lines = sc.textFile("README.md")

lines.first()

# Create RDD from a collection of objects (e.g., a list or a set)
values = sc.parallelize((1,2,3,4))

def add_one(x):
    return x+1

# Transformations: operations on RDDs that return a new RDD. They are computed lazily:
# only when you call them to action. At first, transformations return pointers to new RDDs.
b = values.filter(lambda x: x <2)
c = values.map(add_one)
# Actions: Compute actual output to the driver program or to storage system
print(b.collect(), c.collect())

d = values.reduce(lambda x, y: x+y)
d



[1] [2, 3, 4, 5]


10

In [6]:
l = [('Eve','Alice', 1,2,3,4)]
rdd = sc.parallelize(l)
sqlContext.createDataFrame(rdd)
#DataFrame[_1: string, _2: bigint]
df = sqlContext.createDataFrame(rdd)
df.show()
#dataframe = df.agg(avg('_3'))



k = [('Eve', 'Bob',2,2,2,2)]
rdd = sc.parallelize(k)
sqlContext.createDataFrame(rdd)
#DataFrame[_1: string, _2: bigint]
df2 = sqlContext.createDataFrame(rdd)
df2.show()


df3 = df.unionAll(df2)
df3.show()


+---+-----+---+---+---+---+
| _1|   _2| _3| _4| _5| _6|
+---+-----+---+---+---+---+
|Eve|Alice|  1|  2|  3|  4|
+---+-----+---+---+---+---+

+---+---+---+---+---+---+
| _1| _2| _3| _4| _5| _6|
+---+---+---+---+---+---+
|Eve|Bob|  2|  2|  2|  2|
+---+---+---+---+---+---+

+---+-----+---+---+---+---+
| _1|   _2| _3| _4| _5| _6|
+---+-----+---+---+---+---+
|Eve|Alice|  1|  2|  3|  4|
|Eve|  Bob|  2|  2|  2|  2|
+---+-----+---+---+---+---+



# Single value rdd feature extraction

In [8]:
# group by customer and join with the base data
data = df

dataframe = data.groupBy('customid').agg(spark_max(data.timestamp))

dataframe.show()

rdd_join = df.join(dataframe, on='customid')

rdd_join.show()

+--------+--------------+
|customid|max(timestamp)|
+--------+--------------+
|     CM1|           1.5|
|     CX2|           6.0|
+--------+--------------+

+--------+------+-----+---------+--------------+
|customid|procid|speed|timestamp|max(timestamp)|
+--------+------+-----+---------+--------------+
|     CM1|   aa1|  100|      0.1|           1.5|
|     CM1|   aa1|  110|      0.2|           1.5|
|     CM1|   aa1|  110|      0.9|           1.5|
|     CM1|   aa1|  100|      1.5|           1.5|
|     CM1|   xx2|  100|      0.1|           1.5|
|     CM1|   xx2|  110|      0.2|           1.5|
|     CM1|   xx2|  210|      0.9|           1.5|
|     CM1|   xx2|  100|      1.5|           1.5|
|     CX2|   bb9|  100|      0.1|           6.0|
|     CX2|   bb9|  100|      0.2|           6.0|
|     CX2|   bb9|  110|      6.0|           6.0|
|     CX2|   bb9|  100|     0.18|           6.0|
+--------+------+-----+---------+--------------+



In [9]:
# same works for grouping on processid
dummy = df.groupBy('procid').agg(spark_max(df.timestamp))

dummy.show()


+------+--------------+
|procid|max(timestamp)|
+------+--------------+
|   bb9|           6.0|
|   aa1|           1.5|
|   xx2|           1.5|
+------+--------------+



# feature extraction with window functions

In [15]:
# 

# Define a windowing column.
# Pyspark.sql.functions.Column.over

def s_to_h(x): return x/(60*60)
my_func = udf(s_to_h, DoubleType())

def extr_ft_1 (proc_data, w, limit=4):
    return proc_data.withColumn(
        "speed_feature", 
        psf.sum((proc_data.speed > limit).cast("float")).over(w)
    )


def extr_ft_0(proc_data, w):
    proc_data = proc_data.withColumn(
        "Starttime", 
        psf.min(proc_data.timestamp).over(w)
        )
    
    proc_data = proc_data.withColumn(
        "Endtime", 
        psf.max(proc_data.timestamp).over(w)
        )
    time_diff = (unix_timestamp(proc_data.Endtime)-unix_timestamp(proc_data.Starttime))
    proc_data = proc_data.withColumn(
        "TripDurationSeconds", 
        time_diff
        )
    proc_data = proc_data.withColumn(
        "TripDuration", 
        my_func('TripDurationSeconds')
        )
    
    
    return proc_data


w = Window.partitionBy("procid")


l = [('CM1','aa1', 00.0,datetime.datetime(2017, 5, 30, 20,0,0)),\
     ('CM1','aa1', 5.0,datetime.datetime(2017, 5, 30, 20,1,0)),\
     ('CM1','aa1', 10.0,datetime.datetime(2017, 5, 30, 20,2,0)),\
     ('CM1','aa1', 15.0,datetime.datetime(2017, 5, 30, 20,2,30)),\
     
     ('CM1','bb1', 00.0,datetime.datetime(2017, 5, 30, 19,0,0)),\
     ('CM1','bb1', 3.0,datetime.datetime(2017, 5, 30, 19,1,0)),\
     ('CM1','bb1', 6.0,datetime.datetime(2017, 5, 30, 19,2,0)),\
     ('CM1','bb1', 9.0,datetime.datetime(2017, 5, 30, 19,9,0)),\
     
     ('CM2','cc2', 2.0,datetime.datetime(2017, 5, 30, 23,0,0)),\
     ('CM2','cc2', 4.0,datetime.datetime(2017, 5, 30, 23,1,0)),\
     ('CM2','cc2', 8.0,datetime.datetime(2017, 5, 30, 23,2,0)),\
     ('CM2','cc2', 2.0,datetime.datetime(2017, 5, 30, 23,3,0)),\
]
"""
l = [('CM1','aa1', 00.0,2015-01-02 22:59:58),('CM1','aa1', 10.0,0.2),\
     ('CM1','aa1', 10.0,2015-01-02 22:59:58),('CM1','aa1', 00.0,1.5),\
     ('CM1','xx2', 100.0,0.1),('CM1','xx2', 110.0,0.2),\
     ('CM1','xx2', 210.0,0.9),('CM1','xx2', 100.0,1.5),\
     ('CX2','bb9', 100.0,0.1),('CX2','bb9', 100.0,0.2),\
    ('CX2','bb9', 110.0,6.0),('CX2','bb9', 100.0,0.18)]
"""
schema = StructType([StructField('customid', StringType(), True),
                     StructField('procid', StringType(), True),
                     StructField('speed', DoubleType(), True),
                     StructField('timestamp', TimestampType(), True)]
                     )

rdd = sc.parallelize(l)

df = sqlContext.createDataFrame(rdd,schema)

print(df.schema)

df.show()
df1 = extr_ft_1(df, w)
df0 = extr_ft_0(df1, w)
#df0.show()

df0=df0[['procid', 'speed_feature', 'Starttime', 'Endtime', 'TripDuration' ]].distinct()
df0.show()


StructType(List(StructField(customid,StringType,true),StructField(procid,StringType,true),StructField(speed,DoubleType,true),StructField(timestamp,TimestampType,true)))
+--------+------+-----+-------------------+
|customid|procid|speed|          timestamp|
+--------+------+-----+-------------------+
|     CM1|   aa1|  0.0|2017-05-30 20:00:00|
|     CM1|   aa1|  5.0|2017-05-30 20:01:00|
|     CM1|   aa1| 10.0|2017-05-30 20:02:00|
|     CM1|   aa1| 15.0|2017-05-30 20:02:30|
|     CM1|   bb1|  0.0|2017-05-30 19:00:00|
|     CM1|   bb1|  3.0|2017-05-30 19:01:00|
|     CM1|   bb1|  6.0|2017-05-30 19:02:00|
|     CM1|   bb1|  9.0|2017-05-30 19:09:00|
|     CM2|   cc2|  2.0|2017-05-30 23:00:00|
|     CM2|   cc2|  4.0|2017-05-30 23:01:00|
|     CM2|   cc2|  8.0|2017-05-30 23:02:00|
|     CM2|   cc2|  2.0|2017-05-30 23:03:00|
+--------+------+-----+-------------------+

+------+-------------+-------------------+-------------------+--------------------+
|procid|speed_feature|          Starttime|

# Append accel values

In [16]:

l = [('CM1','aa1',  3.0,  None, datetime.datetime(2017, 5, 30, 20,0,1)),\
     ('CM1','aa1',  None,    .1, datetime.datetime(2017, 5, 30, 20,0,4)),\
     ('CM1','aa1',  None,    .2, datetime.datetime(2017, 5, 30, 20,0,8)),\
     ('CM1','aa1',  None,    .3, datetime.datetime(2017, 5, 30, 20,0,12)),\
     ('CM1','aa1',  None,     .4, datetime.datetime(2017, 5, 30, 20,0,30)),\
     ('CM1','aa1',  None,    .0, datetime.datetime(2017, 5, 30, 20,0,33)),\
     ('CM1','aa1', 2.0,    None, datetime.datetime(2017, 5, 30, 20,0,37)),\
     ('CM1','aa1',  None,    .1, datetime.datetime(2017, 5, 30, 20,0,39)),\
     ('CM1','aa1',  None,     .0, datetime.datetime(2017, 5, 30, 20,0,39)),\
     ('CM1','aa1',  None,     .2, datetime.datetime(2017, 5, 30, 20,0,49)),\
     ('CM1','aa1',  None,    .8, datetime.datetime(2017, 5, 30, 20,0,55)),\
     ('CM1','aa1',  4.0,  None, datetime.datetime(2017, 5, 30, 20,0,59))
        ]

schema = StructType([StructField('customid', StringType(), True),
                     StructField('procid', StringType(), True),
                     StructField('speed', DoubleType(), True),
                     StructField('wait', DoubleType(), True),
                     StructField('timestamp', TimestampType(), True)]
                     )

rdd = sc.parallelize(l)

df = sqlContext.createDataFrame(rdd,schema)

df = df.withColumn('u_ts', unix_timestamp(df.timestamp))

w = \
  Window.partitionBy(df['procid']).orderBy(df['timestamp'].asc())#.rangeBetween(-1, 0)

df = df.withColumn('delay', (psf.lag(df.u_ts, 0).over(w))-(psf.lag(df.u_ts, 1).over(w)))
    
df.show()


w = \
  Window.partitionBy(df['procid']).orderBy(df['timestamp'].asc())#.rangeBetween(-1, 0)
 

for i in range(8):
    """
    df = df.withColumn('windowresult1',
                 (psf.lag(df.wait, 0).over(w)))

    df = df.withColumn('windowresult2',
                 psf.lag(df.speed, 1).over(w))
    """
    df = df.withColumn('speed',
                 psf.when(df.speed.isNull() == True, (psf.lag(df.wait, 0).over(w))*df.delay+psf.lag(df.speed, 1).over(w))\
                       .otherwise(df.speed))



    #df = df.withColumn('speed',psf.coalesce(df.speed, df.result))

df.show()




+--------+------+-----+----+-------------------+----------+-----+
|customid|procid|speed|wait|          timestamp|      u_ts|delay|
+--------+------+-----+----+-------------------+----------+-----+
|     CM1|   aa1|  3.0|null|2017-05-30 20:00:01|1496167201| null|
|     CM1|   aa1| null| 0.1|2017-05-30 20:00:04|1496167204|    3|
|     CM1|   aa1| null| 0.2|2017-05-30 20:00:08|1496167208|    4|
|     CM1|   aa1| null| 0.3|2017-05-30 20:00:12|1496167212|    4|
|     CM1|   aa1| null| 0.4|2017-05-30 20:00:30|1496167230|   18|
|     CM1|   aa1| null| 0.0|2017-05-30 20:00:33|1496167233|    3|
|     CM1|   aa1|  2.0|null|2017-05-30 20:00:37|1496167237|    4|
|     CM1|   aa1| null| 0.1|2017-05-30 20:00:39|1496167239|    2|
|     CM1|   aa1| null| 0.0|2017-05-30 20:00:39|1496167239|    0|
|     CM1|   aa1| null| 0.2|2017-05-30 20:00:49|1496167249|   10|
|     CM1|   aa1| null| 0.8|2017-05-30 20:00:55|1496167255|    6|
|     CM1|   aa1|  4.0|null|2017-05-30 20:00:59|1496167259|    4|
+--------+

# List chunking (generator), multi-file pickling of pyspark dataframes

### Create dummy dataframe with box ids
### Get a list with chunks of box-ids in string format for hiveContext
### Dump the liste as pickle

In [15]:
import random

class BoxIdHandlers:
    

    
    def __init__(self):
        
        self.l = [('CM{}'.format(np.random.randint(100)), np.random.randint(2)) for i in range(100)]
        self.df = pd.DataFrame(self.l, columns=['Id', 'crash'])


        
        
    def chunks(self, lizt, size):
        """Yield successive -sized chunks from list"""
        for i in range(0, len(lizt), size):
            
            yield lizt[i:i + size]
    
    def getBoxChunksAsStrings(self, chunksize= 10, totalnumber= 100):
        
        boxes = self.df.loc[self.df.crash ==  0 ].Id.tolist() + self.df.loc[self.df.crash > 0 ].Id.tolist()
        
        random.shuffle(boxes)
        
        boxes = boxes[:totalnumber]
            
        return ['"'+'","'.join(chunk)+'"' for chunk in self.chunks(boxes,chunksize)]
        #return [chunk for chunk in chunks(boxes,chunksize)]
    
    
    def pickleNdump(seld, boxlist, filename = 'boxstring'):
        
        script_dir = os.path.dirname(os.getcwd()) #<-- absolute dir the script is in
        rel_path = "boxlists/{}.pkl".format(filename)
        abs_file_path = os.path.join(script_dir, rel_path)
        with open(abs_file_path, 'wb') as f:
                pickle.dump(boxlist, f)
    
    

BoxIdHandler = BoxIdHandlers()

v = BoxIdHandler.getBoxChunksAsStrings(chunksize = 2, totalnumber = 11)

BoxIdHandler.pickleNdump(v, filename = 'boxstringgg')

### load the pickled list an test output

In [18]:



script_dir = os.path.dirname(os.getcwd()) #<-- absolute dir the script is in
rel_path = "boxlists/boxstring.pkl"
abs_file_path = os.path.join(script_dir, rel_path)
with open(abs_file_path, 'rb') as f:
    test = pickle.load(f)
for chunk in test: print(chunk), print(abs_file_path)

"CM62","CM39"
/home/lizecallys/Dropbox/Schatztruhe 911/J's Kram/IT_Projekte/Python/boxlists/boxstring.pkl
"CM93","CM97"
/home/lizecallys/Dropbox/Schatztruhe 911/J's Kram/IT_Projekte/Python/boxlists/boxstring.pkl
"CM46","CM80"
/home/lizecallys/Dropbox/Schatztruhe 911/J's Kram/IT_Projekte/Python/boxlists/boxstring.pkl
"CM98","CM36"
/home/lizecallys/Dropbox/Schatztruhe 911/J's Kram/IT_Projekte/Python/boxlists/boxstring.pkl
"CM36","CM13"
/home/lizecallys/Dropbox/Schatztruhe 911/J's Kram/IT_Projekte/Python/boxlists/boxstring.pkl
"CM28"
/home/lizecallys/Dropbox/Schatztruhe 911/J's Kram/IT_Projekte/Python/boxlists/boxstring.pkl


### Read a file with dates. Return iteratively one date after another, if the date is not in the file. Write the date afterwards to the file.

In [36]:

def get_path():
    script_dir = os.path.dirname(os.getcwd()) #<-- absolute dir the script is in
    rel_path = "processed_days.txt"
    abs_file_path = os.path.join(script_dir, rel_path)
    return abs_file_path

first = True
for i in range(8):
    start_day = dt.date(2016,10,1)
    if first:
        next_day = start_day + dt.timedelta(days = 1)
        first = False
    else:
        next_day = next_day + dt.timedelta(days = 1)
    
    with open(get_path()) as f:
        lines = f.read().splitlines()
    
    if str(next_day) not in lines:
        print('New day, writing {}'.format(str(next_day)))
        
        with open(get_path(), 'a') as f:
            f.write(str(next_day)+'\n')
    
    #print(str(start_day), str(next_day))

New day, writing 2016-10-02
New day, writing 2016-10-03
New day, writing 2016-10-04
New day, writing 2016-10-05
New day, writing 2016-10-06
New day, writing 2016-10-07
New day, writing 2016-10-08
New day, writing 2016-10-09


In [22]:
with open(get_path()) as f:
    lines = f.read().splitlines()
    
'hello' in lines

True

### For all chunks get a pyspark dataframe and dump each in a pickle file

In [19]:
def features_to_pickle(file_counter):
    
    l = [('CM{}'.format(np.random.randint(100)),np.random.rand()) for i in range(10)]

    schema = StructType([StructField('Id', StringType(), True),
                     StructField('speed', DoubleType(), True),
    ]
                     )

    rdd = sc.parallelize(l)

    df = sqlContext.createDataFrame(rdd,schema)

    rows = df.collect()
    
    a = [row.asDict() for row in row_list]
    
    script_dir = os.path.dirname(os.getcwd()) #<-- absolute dir the script is in
    rel_path = 'trips/trips_{}.pkl'.format(file_counter)
    abs_file_path = os.path.join(script_dir, rel_path)
    with open(abs_file_path, 'wb') as f:
        pickle.dump(a, f)
    

for i, chunk in enumerate(test): features_to_pickle(i)

### fetch all pickled pyspark dataframe files from directory and combine them in one pandas dataframe

In [62]:
def combine_pickles_to_one_df():
    
    script_dir = os.path.dirname(os.getcwd()) #<-- absolute dir the script is in
    
    rel_path = "trips/"
    
    abs_folder_path = os.path.join(script_dir, rel_path)
    
    liste = []
    
    switch = True
    
    for file in os.listdir(abs_folder_path):
        
        abs_file_path = os.path.join(abs_folder_path, file)
        

        
        with open(abs_file_path, 'rb') as f:  
    
            unpickled_file = pickle.load(f)
    
            liste.append(pd.DataFrame(unpickled_file))
        
            while switch:
                
                print('y')
                
                df = pd.DataFrame(unpickled_file)
                
                switch = False
                
                
                
                
            dff = pd.DataFrame(unpickled_file)
                
            df = df.append(dff, ignore_index = True)
                
        
    return df
    
df = combine_pickles_to_one_df()
    
df



y


Unnamed: 0,Id,speed
0,CM41,0.61700
1,CM38,0.02875
2,CM79,0.79116
3,CM46,0.38107
4,CM46,0.13130
5,CM4,0.70334
6,CM61,0.76189
7,CM79,0.42782
8,CM34,0.82503
9,CM33,0.88952


# TODO

In [18]:
import datetime

a = datetime.datetime(2017, 5, 30, 20,0,0)

print(a)

2017-05-30 20:00:00


In [10]:
sc._conf.getAll()
dir(sc)

['PACKAGE_EXTENSIONS',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__enter__',
 '__eq__',
 '__exit__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getnewargs__',
 '__gt__',
 '__hash__',
 '__init__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_accumulatorServer',
 '_active_spark_context',
 '_batchSize',
 '_callsite',
 '_checkpointFile',
 '_conf',
 '_dictToJavaMap',
 '_do_init',
 '_ensure_initialized',
 '_gateway',
 '_getJavaStorageLevel',
 '_initialize_context',
 '_javaAccumulator',
 '_jsc',
 '_jvm',
 '_lock',
 '_next_accum_id',
 '_pickled_broadcast_vars',
 '_python_includes',
 '_repr_html_',
 '_temp_dir',
 '_unbatched_serializer',
 'accumulator',
 'addFile',
 'addPyFile',
 'appName',
 'applicationId',
 'binaryFiles',
 'binaryRecords',
 'broadcast',
 'cancelAllJobs',
 'cancelJobGroup',
 'defaultMinPartitions',
 '

In [15]:
sc.getConf().getAll()

[('spark.master', 'local'),
 ('spark.driver.port', '49823'),
 ('spark.app.id', 'local-1505851443817'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.host', '192.168.178.49'),
 ('spark.app.name', 'My App2'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client')]

# Example of working on partitions

In [20]:
#exec(open('script.py').read())
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, HiveContext
from operator import itemgetter
#conf = SparkConf().setAppName("temp")
#conf = SparkConf().setMaster("local").setAppName("My App2").setSystemProperty('spark.executor.memory', '8000mb')
#sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
hiveCtx=HiveContext(sc)


NUMBER_OF_PARTITIONS = 1


def get_feature_1(data):


    result = {}

    feature = 0
    was_excess = False
    trip_old = "no_trip"
    for key_value in data:

        if key_value:
            key = list(key_value.keys())[0]
            value = key_value[key]
            if key not in result:
                result[key] = 0
            value = sorted(value, key=itemgetter(0,1))
            #result = (key, value)
            #print(value_sorted)
            
            for v in value:

                time = v[1]
                trip = v[0]

                if trip != trip_old:
                    was_excess = False
                if was_excess:
                    feature = (time - time_old)
                    if key in result:
                        result[key] += feature
                    else:
                        result[key] = feature

                    was_excess = False

                if v[2] - v[3] > 0:
                    was_excess = True

                time_old = time
                trip_old = trip
            
    return result
    
def get_feature_2(data):


    result = {}

    feature = 0
    was_excess = False
    trip_old = "no_trip"
    for key_value in data:

        if key_value:
            key = list(key_value.keys())[0]
            value = key_value[key]
            if key not in result:
                result[key] = 0
            value = sorted(value, key=itemgetter(0,1))
            #result = (key, value)
            #print(value_sorted)

            for v in value:

                time = v[1]
                trip = v[0]
                if key in result:
                    result[key] += feature
                else:
                    result[key] = feature

    return result



def generate_trip_features_on_partition(iterator):
    data = {}
    data2 = []
    for key, value in iterator:
        data2.append((key, value))
        if key in data:
            data[key] +=  [value]
            t = 1
        else:
            data[key] = [value]
  
    print('data:',data)
    feature_1 = get_feature_1([data])
    feature_2 = get_feature_2([data])


    trips = set(list(feature_1.keys()) + list(feature_2.keys()))
    print('trips', trips)
    result = {}    
    for trip in trips:
        result[trip] = {'f1':0, 'f2':0}
        if trip in feature_1:
            result[trip]['f1'] = feature_1[trip]
        if trip in feature_2:
            result[trip]['f2'] = feature_2[trip]

    return [feature_2]
    #return [result]

def generate_trip_features(data):

            NUMBER_OF_PARTITIONS = 1
            #data_rdd = data.rdd
            data_rdd = sc.parallelize([ ['t4', 0, 51, 50, 1.1, -0.1, 9.8],\
                                       ['t2', 10, 48, 50, 0.8, -0.2, 9.8],\
                                       ['t2', 12, 54, 50, 0.5, -1.0 , 9.8],\
                                       ['t2', 5, 35, 30, 1.1, -0.1,9.8],\
                                       ['t2', 6, 38, 30, 0.8, -0.2, 9.8],\
                                       ['t2', 7, 34, 30, 0.5, -1.0 , 9.8] ])

            data_mapped = data_rdd.map(lambda x: (x[0], [a for a in x])).partitionBy(NUMBER_OF_PARTITIONS)
            print('data_mapped:', data_mapped.collect())
            results = data_mapped.mapPartitions(generate_trip_features_on_partition).collect()
        
            print(results)        

            return












data_mapped: [('t4', ['t4', 0, 51, 50, 1.1, -0.1, 9.8]), ('t2', ['t2', 10, 48, 50, 0.8, -0.2, 9.8]), ('t2', ['t2', 12, 54, 50, 0.5, -1.0, 9.8]), ('t2', ['t2', 5, 35, 30, 1.1, -0.1, 9.8]), ('t2', ['t2', 6, 38, 30, 0.8, -0.2, 9.8]), ('t2', ['t2', 7, 34, 30, 0.5, -1.0, 9.8])]
[{'t4': 0}]
StructType(List(StructField(customid,StringType,true),StructField(procid,StringType,true),StructField(speed,DoubleType,true),StructField(timestamp,TimestampType,true)))


[('CM1', ['CM1', 'aa1', 0.0, datetime.datetime(2017, 5, 30, 20, 0)]),
 ('CM1', ['CM1', 'aa1', 5.0, datetime.datetime(2017, 5, 30, 20, 1)]),
 ('CM1', ['CM1', 'aa1', 10.0, datetime.datetime(2017, 5, 30, 20, 2)]),
 ('CM1', ['CM1', 'aa1', 15.0, datetime.datetime(2017, 5, 30, 20, 2, 30)]),
 ('CM1', ['CM1', 'bb1', 0.0, datetime.datetime(2017, 5, 30, 19, 0)]),
 ('CM1', ['CM1', 'bb1', 3.0, datetime.datetime(2017, 5, 30, 19, 1)]),
 ('CM1', ['CM1', 'bb1', 6.0, datetime.datetime(2017, 5, 30, 19, 2)]),
 ('CM1', ['CM1', 'bb1', 9.0, datetime.datetime(2017, 5, 30, 19, 9)]),
 ('CM2', ['CM2', 'cc2', 2.0, datetime.datetime(2017, 5, 30, 23, 0)]),
 ('CM2', ['CM2', 'cc2', 4.0, datetime.datetime(2017, 5, 30, 23, 1)]),
 ('CM2', ['CM2', 'cc2', 8.0, datetime.datetime(2017, 5, 30, 23, 2)]),
 ('CM2', ['CM2', 'cc2', 2.0, datetime.datetime(2017, 5, 30, 23, 3)])]

In [48]:
#http://parrotprediction.com/partitioning-in-apache-spark/
from pprint import pprint
from operator import itemgetter

def get_dataframe():
    
        l = [('CM1','aa1', 00.0,datetime.datetime(2017, 5, 30, 20,0,0)),\
             ('CM1','aa1', 5.0,datetime.datetime(2017, 5, 30, 20,1,0)),\
             ('CM1','aa1', 10.0,datetime.datetime(2017, 5, 30, 20,2,0)),\
            ('CM1','aa1', 15.0,datetime.datetime(2017, 5, 30, 20,2,30)),\
            ('CM1','bb1', 00.0,datetime.datetime(2017, 5, 30, 19,0,0)),\
            ('CM1','bb1', 3.0,datetime.datetime(2017, 5, 30, 19,1,0)),\
            ('CM1','bb1', 6.0,datetime.datetime(2017, 5, 30, 19,2,0)),\
            ('CM1','bb1', 9.0,datetime.datetime(2017, 5, 30, 19,9,0)),\
            ('CM2','cc2', 2.0,datetime.datetime(2017, 5, 30, 23,0,0)),\
            ('CM2','cc2', 4.0,datetime.datetime(2017, 5, 30, 23,1,0)),\
            ('CM2','cc2', 8.0,datetime.datetime(2017, 5, 30, 23,2,0)),\
            ('CM2','cc2', 2.0,datetime.datetime(2017, 5, 30, 23,3,0)),\
            ]

        schema = StructType([StructField('customid', StringType(), True),
                     StructField('procid', StringType(), True),
                     StructField('speed', DoubleType(), True),
                     StructField('timestamp', TimestampType(), True)]
                     )

        rdd = sc.parallelize(l)

        df = sqlContext.createDataFrame(rdd,schema)

        #print(df.schema)
        
        df.show()
        
        return df
    
def get_speed_feature(trips_as_dict, limit = 3):
    """
    :input:   type dixt
            {'CM1': [['CM1', 'aa1', 0.0, 1],
                    ['CM1', 'aa1', 5.0, 2],
                     ['CM1', 'aa1', 10.0, 3],
                     ['CM1', 'aa1', 15.0, 3.5],
                     ['CM1', 'bb1', 0.0, 3.5],
                     ['CM1', 'bb1', 3.0, 3.5],
                     ['CM1', 'bb1', 6.0, 3.5],
                     ['CM1', 'bb1', 9.0, 3.5]],
             'CM2': [['CM2', 'cc2', 2.0, 3.5],
                     ['CM2', 'cc2', 4.0, 3.5],
                     ['CM2', 'cc2', 8.0, 3.5],
                     ['CM2', 'cc2', 2.0, 3.5]]}
                     
    :output: {'CM1':{ft1:99, ft2:27272, ...}, 'CM2':{ft1:888, ft2:27272,...}}

    
    """
    print('input get_speed_features\n')
    pprint(trips_as_dict)
    
    overall_result = {}
    result = {}
    for (trip_id, data_point_list) in trips_as_dict.items():
        
        #print(trip_id, data_point_list) #(s)
        
        #print('before sort\n')
        #pprint(data_point_list) # (s)
        data_point_list = sorted(data_point_list, key = itemgetter(0,3))       
        print('after sort\n')
        pprint(data_point_list) # (s)
        
        counter = 0
        for number, data_point in enumerate(data_point_list):
            if data_point[2] > limit:
                if number == 0:
                    timeperiod_of_offense = data_point_list[number][3]
                else:
                    timeperiod_of_offense = data_point_list[number][3] -\
                                            data_point_list[number-1][3]
                print('tpr_offense: ', timeperiod_of_offense)
                counter +=1*timeperiod_of_offense
        result[trip_id] = ('ft1',counter)
    
    print('Output get_speed_feature\n')
    pprint(result)
    
    return result
            
def get_speed_feature2(trips_as_dict, limit = 5):
    """
    :input:   type dixt
            {'CM1': [['CM1', 'aa1', 0.0, 1],
                    ['CM1', 'aa1', 5.0, 2],
                     ['CM1', 'aa1', 10.0, 3],
                     ['CM1', 'aa1', 15.0, 3.5],
                     ['CM1', 'bb1', 0.0, 3.5],
                     ['CM1', 'bb1', 3.0, 3.5],
                     ['CM1', 'bb1', 6.0, 3.5],
                     ['CM1', 'bb1', 9.0, 3.5]],
             'CM2': [['CM2', 'cc2', 2.0, 3.5],
                     ['CM2', 'cc2', 4.0, 3.5],
                     ['CM2', 'cc2', 8.0, 3.5],
                     ['CM2', 'cc2', 2.0, 3.5]]}
                     
    :output: {'CM1':{ft1:99, ft2:27272, ...}, 'CM2':{ft1:888, ft2:27272,...}}

    
    """
    print('input get_speed_features\n')
    pprint(trips_as_dict)
    
    overall_result = {}
    result = {}
    for (trip_id, data_point_list) in trips_as_dict.items():
        
        #print(trip_id, data_point_list) #(s)
        
        #print('before sort\n')
        #pprint(data_point_list) # (s)
        data_point_list = sorted(data_point_list, key = itemgetter(0,3))       
        print('after sort\n')
        pprint(data_point_list) # (s)
        
        counter = 0
        for number, data_point in enumerate(data_point_list):
            if data_point[2] > limit:
                if number == 0:
                    timeperiod_of_offense = data_point_list[number][3]
                else:
                    timeperiod_of_offense = data_point_list[number][3] -\
                                            data_point_list[number-1][3]
                print('tpr_offense: ', timeperiod_of_offense)
                counter +=1*timeperiod_of_offense
        result[trip_id] = ('ft2',counter)
    
    print('Output get_speed_feature\n')
    pprint(result)
    
    return result
            
            
def generate_trip_features_on_partition(kv_iterator):
    """
    Hint: using k-v pairs where they key is inside the value again.\
          Function must get and retur an iterable.
          Immutable are ints, strings and tuples.
    
    Input: KV-tuple aus trip-id und Fahrdaten
    ('CM1', ['CM1', 'bb1', 9.0, 3.5])
    ('CM1', ['CM1', 'bb1', 9.0, 9.5])
    ('CM1', ['CM1', 'bb1', 9.0, 9.5])
    ('CM2', ['CM2', 'cc2', 2.0, 3.5]),
    
    Output: list(dict('CM1':{tripID: 'CM1', feature1: value, .....},
                      'CM2':{tripID: 'CM2', feature1: value, .....}))
    
    """
    print('input generte trip features\n')
    pprint(kv_iterator)
    
    S = {}
    key_list = set()
    for key, value in kv_iterator:
        
        key_list = key_list | set([key]) # set works on iterables -> just the key-string would get split
        
        if key not in S.keys():
            S[key] = [value]
        
        else:
            S[key] += [value]
    all_features_per_trip = []
    all_features_per_trip.append(get_speed_feature(S)) # {'CM1': ('ft1', 2.055), 'CM2': ('ft1', 5.5)})
    all_features_per_trip.append(get_speed_feature2(S)) # {'CM1': ('ft1', 2.055), 'CM2': ('ft1', 5.5)})
    

    result = {key:{feature_list in all_features_per_trip} for key in key_list}

"""
df =  get_dataframe()

df_mapped = df.rdd.map(lambda x: (x[0], [a for a in x]))
pprint(df_mapped.collect()) #(s)

#generate_trip_features(0)
df_mapped = df_mapped.collect()# (t)
"""
df_mapped = [('CM1', ['CM1', 'aa1', 0.0, 1]),
 ('CM1', ['CM1', 'aa1', 5.0, 2]),
 ('CM1', ['CM1', 'aa1', 10.0, 3]),
 ('CM1', ['CM1', 'aa1', 15.0, 3.544]),
 ('CM1', ['CM1', 'bb1', 0.0, 1.5]),
 ('CM1', ['CM1', 'bb1', 3.0, 1.2]),
 ('CM1', ['CM1', 'bb1', 6.0, 1.011]),
 ('CM1', ['CM1', 'bb1', 9.0, 3.5]),
 ('CM2', ['CM2', 'cc2', 2.0, 31.5]),
 ('CM2', ['CM2', 'cc2', 4.0, 4.5]),
 ('CM2', ['CM2', 'cc2', 8.0, 32.5]),
 ('CM2', ['CM2', 'cc2', 2.0, 399.5])]
generate_trip_features_on_partition(df_mapped) # (t)




input generte trip features

[('CM1', ['CM1', 'aa1', 0.0, 1]),
 ('CM1', ['CM1', 'aa1', 5.0, 2]),
 ('CM1', ['CM1', 'aa1', 10.0, 3]),
 ('CM1', ['CM1', 'aa1', 15.0, 3.544]),
 ('CM1', ['CM1', 'bb1', 0.0, 1.5]),
 ('CM1', ['CM1', 'bb1', 3.0, 1.2]),
 ('CM1', ['CM1', 'bb1', 6.0, 1.011]),
 ('CM1', ['CM1', 'bb1', 9.0, 3.5]),
 ('CM2', ['CM2', 'cc2', 2.0, 31.5]),
 ('CM2', ['CM2', 'cc2', 4.0, 4.5]),
 ('CM2', ['CM2', 'cc2', 8.0, 32.5]),
 ('CM2', ['CM2', 'cc2', 2.0, 399.5])]
input get_speed_features

{'CM1': [['CM1', 'aa1', 0.0, 1],
         ['CM1', 'aa1', 5.0, 2],
         ['CM1', 'aa1', 10.0, 3],
         ['CM1', 'aa1', 15.0, 3.544],
         ['CM1', 'bb1', 0.0, 1.5],
         ['CM1', 'bb1', 3.0, 1.2],
         ['CM1', 'bb1', 6.0, 1.011],
         ['CM1', 'bb1', 9.0, 3.5]],
 'CM2': [['CM2', 'cc2', 2.0, 31.5],
         ['CM2', 'cc2', 4.0, 4.5],
         ['CM2', 'cc2', 8.0, 32.5],
         ['CM2', 'cc2', 2.0, 399.5]]}
after sort

[['CM2', 'cc2', 4.0, 4.5],
 ['CM2', 'cc2', 2.0, 31.5],
 ['CM2', 'cc2', 

In [19]:
S = {'a':1, 'b':2, 'c':3}
O = {'a':{'f':10}, 'b':{'h':20}, 'c':{'g':190}}
L = [1,2,3,4]

print(S.keys(), S.values(), S.items())
for i in S: print(i)

print(L+L)
L.append(1)
print (L)

print(O['a'])

dict_keys(['b', 'a', 'c']) dict_values([2, 1, 3]) dict_items([('b', 2), ('a', 1), ('c', 3)])
b
a
c
[1, 2, 3, 4, 1, 2, 3, 4]
[1, 2, 3, 4, 1]
{'f': 10}
