# Import Your Own Data

## Import Packages

In [1]:
%reload_ext autoreload
from util.dependencies import *
from settings import USER_ID
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructField, StructType, StringType, FloatType, TimestampType, IntegerType
from cerebralcortex.data_importer.data_parsers import csv_data_parser
from cerebralcortex.data_importer import import_dir
from pyspark.sql.functions import minute, second, mean, window
from pyspark.sql import functions as F
#from tt import magic_method

# Import Data

In [None]:
# import CSV/IoT data (demo stream name - demo-location-data-stream)
import_dir(
    cc_config="/home/md2k/cc_conf/",
    input_data_dir="sample_data/",
    user_id=USER_ID,
    data_file_extension=[".csv"],
    data_parser=csv_data_parser,
    gen_report=True
)

## Create CerebralCortex object

In [2]:
CC = Kernel("/home/md2k/cc_conf/")

## View Imported Data

In [3]:
location_stream = CC.get_stream("iot-data-stream")
location_stream.show(5, truncate=False)
#location_stream.plot()

+-------------------+-------------------+-------------------+-------+------------------------------------+
|timestamp          |localtime          |some_vals          |version|user                                |
+-------------------+-------------------+-------------------+-------+------------------------------------+
|2019-01-09 17:35:00|2019-01-09 17:35:00|0.0851887269487499 |1      |00000000-afb8-476e-9872-6472b4e66b68|
|2019-01-09 17:35:01|2019-01-09 17:35:01|0.16867549655743164|1      |00000000-afb8-476e-9872-6472b4e66b68|
|2019-01-09 17:35:02|2019-01-09 17:35:02|0.7404850816560419 |1      |00000000-afb8-476e-9872-6472b4e66b68|
|2019-01-09 17:35:03|2019-01-09 17:35:03|0.7131609970182962 |1      |00000000-afb8-476e-9872-6472b4e66b68|
|2019-01-09 17:35:04|2019-01-09 17:35:04|0.24253081438369195|1      |00000000-afb8-476e-9872-6472b4e66b68|
+-------------------+-------------------+-------------------+-------+------------------------------------+
only showing top 5 rows



In [None]:
# @pandas_udf("some_vals double", PandasUDFType.GROUPED_MAP)
# def subtract_mean(pdf):
#     # pdf is a pandas.DataFrame
#     return pd.DataFrame(pdf["some_vals"])

# location_stream.run_algo(subtract_mean).show()
#location_stream.compute_max().show(truncate=False)
#df.withColumn('d', fun_function(df.some_vals)).show()

#df.groupBy("timestamp", window("timestamp", "2 seconds")).agg(df["mean"]).show()

# df2.groupBy("window").agg(F.mean('avg(some_vals)')).show(truncate=False)


In [13]:
schema = StructType([
    StructField("id", FloatType()),
    StructField("id2", IntegerType()),
    StructField("id3", IntegerType())
])

@F.udf(schema)
def udfName(some_vals):
    n = sum(some_vals)*50
    return [n,4,32]
#find_a_udf = F.udf(find_a, schema)

#CC.sqlContext.udf.register("find_a", find_a)

# def magic_method(func_name, col_names:list=[],df=None):
#     if len(col_names)==0:
#         raise ValueError("col_names list cannot be empty.")
#     tmp = ""
#     for col in col_names:
#         tmp += "F.collect_list({}{}{}){}".format('"',col,'"',",")
#     tmp = "{}{}{}{}".format(str(func_name), "(",tmp.rstrip(","), ")")
#     #tt = eval(tmp)
#     foobars = df.data.groupBy("timestamp", window("timestamp", "15 days")).agg(F.expr('find_a(collect_list("some_vals"))').alias("foobar"))
#     foobars.show()
#     cols = foobars.schema.fields
#     new_cols = []
#     for col in cols:
#         if col.name=="foobar":
#             for cl in foobars.schema.fields[2].dataType.names:
#                 new_cols.append("foobar."+cl)
#         else:
#             new_cols.append(col.name)
#     foobars.select(new_cols).show(truncate=False)

#magic_method(find_a, col_names=["some_vals"],df=location_stream)

In [14]:
CC.sqlContext.udf.register("udfName", udfName)
ds2 = location_stream.run_algorithm(udfName, columnNames=["some_vals"])
ds2.show(truncate=False)

+------------------------------------+-------+------------------------------------------+------------------+
|user                                |version|window                                    |merged_column     |
+------------------------------------+-------+------------------------------------------+------------------+
|00000000-afb8-476e-9872-6472b4e66b68|1      |[2019-01-09 17:41:00, 2019-01-09 17:42:00]|[2835.6885, 4, 32]|
|00000000-afb8-476e-9872-6472b4e66b68|1      |[2019-01-09 17:38:00, 2019-01-09 17:39:00]|[3339.4434, 4, 32]|
|00000000-afb8-476e-9872-6472b4e66b68|1      |[2019-01-09 17:44:00, 2019-01-09 17:45:00]|[3187.8225, 4, 32]|
|00000000-afb8-476e-9872-6472b4e66b68|1      |[2019-01-09 17:40:00, 2019-01-09 17:41:00]|[2784.6248, 4, 32]|
|00000000-afb8-476e-9872-6472b4e66b68|1      |[2019-01-09 17:49:00, 2019-01-09 17:50:00]|[2965.9531, 4, 32]|
|00000000-afb8-476e-9872-6472b4e66b68|1      |[2019-01-09 17:35:00, 2019-01-09 17:36:00]|[2795.9856, 4, 32]|
|00000000-afb8-476e

In [None]:
location_stream.metadata

## Write a smoothing algorithm

In [None]:
schema = StructType([
    StructField("ts", TimestampType())
    ])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def smooth_df(data):
#     df= data['some_vals'].ewm(span = 3600).mean()
#     print(type(df))
    df2 = pd.DataFrame([data.timestamp], columns=['ts'])
    return df2

## Run smoothing algorithm on imported data

In [None]:
#smooth_stream = location_stream.groupby("user").compute(smooth_df)
smooth_stream = location_stream.compute(smooth_df)
smooth_stream.show(3 truncate=False)

In [None]:
import pyspark.sql.functions
from pyspark.sql.functions import minute, second, mean, window

@pandas_udf('double', PandasUDFType.SCALAR)
def fun_function(a):
    return a+2
df=location_stream.data

#df.withColumn('d', fun_function(df.some_vals)).show()

df.groupBy("timestamp", window("timestamp", "5 minutes")).stddev("some_vals").show(5, False)

interval = 60
gdf = df.withColumn(
    'time_interval',
    pyspark.sql.functions.from_unixtime(pyspark.sql.functions.floor(pyspark.sql.functions.unix_timestamp(df['timestamp']) / interval) * interval)
).groupBy('time_interval')

#df.groupBy(second("timestamp").alias("hour")).agg(mean("some_vals").alias("mean")).show()

## Visualize data

In [None]:
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.getOrCreate()

df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"),
                            (13.00, "2018-03-11T12:27:18+00:00"),
                            (25.00, "2018-03-12T11:27:18.98+00:00"),
                            (20.00, "2018-03-13T15:27:18+00:00"),
                            (17.00, "2018-03-14T12:27:18+00:00"),
                            (99.00, "2018-03-15T11:27:17.56+00:00"),
                            (156.00, "2018-03-22T11:27:18+00:00"),
                            (17.00, "2018-03-31T11:27:18+00:00"),
                            (25.00, "2018-03-15T11:27:18+00:00"),
                            (25.00, "2018-03-16T11:27:18+00:00")
                            ],
                           ["id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))
df.show(3)
schema = StructType([
    StructField("id", IntegerType()),
    StructField("ts", TimestampType())
])


@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def some_udf(df):
    # some computation
    df = pd.DataFrame(([df["id"], df["ts"]]), columns=['id', 'ts'])
    return df

extended = (df.withColumn("groupby_col", 
                          F.concat(F.col("ts").cast("date"), F.lit("-"), F.hour(F.col("ts")), F.lit("-"), F.minute(F.col("ts")), F.lit("-"), F.second(F.col("ts"))).cast("string")
                         ))
extended.show(truncate=False)
#df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show()