In [54]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
import numpy as np
np.set_printoptions(threshold=np.inf)

In [31]:
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()

sc = spark.sparkContext

(a)

In [74]:
def create_dataframe(fileName):
    schema_price = ["Price", "Timestamp", "InstanceType",
                    "ProductDescription", "AvailabilityZone"]
    lines = sc.textFile(fileName)
    header = lines.first()
    lines = lines.filter(lambda l :not str(l).startswith(header)) # drop header
    # drop the first col in RDD
    parts = lines.map(lambda l: l.split("\t")[1:]).filter(lambda l: len(l) == len(schema_price))
    
    parts = parts.map(lambda p: [p[i].strip() for i in range(len(schema_price))])
    
    fields = [StructField(field_name, StringType(), True) for field_name in schema_price]
    schema = StructType(fields)
    
    df_price = spark.createDataFrame(parts, schema_price)
    df_price = df_price.withColumn("Price", col("Price").cast(FloatType()))
    df_price = df_price.withColumn("Timestamp", to_timestamp("Timestamp", "yyyy-MM-dd'T'HH:mm:ssZ"))

    return df_price


def create_combination_dictionary(df):
    #length = df.count()
    cond = ["InstanceType", "ProductDescription", "AvailabilityZone"]
    df_combine = df.groupBy(cond).count()
    df_combine = df_combine.join(df, cond).drop("count")
    rdd = df_combine.rdd # convert df to RDD
    pairs_rdd = rdd.map(lambda x:((str(x[0] + ',' + str(x[1]) + ',' + str(x[2])), tuple(x[3:]))))
    pairs_df = spark.createDataFrame(pairs_rdd).toDF("keys", "values")
    pairs_df = pairs_df.groupby("keys").agg(collect_list('values'))
    return pairs_df

In [77]:
filename = "dataset-EC2-series/prices-eu-central-1-2019-05-24.txt.gz"
df = create_dataframe(filename)
df.show(10, False)

+-------+-------------------+------------+------------------+----------------+
|Price  |Timestamp          |InstanceType|ProductDescription|AvailabilityZone|
+-------+-------------------+------------+------------------+----------------+
|0.2656 |2019-05-24 03:26:23|m5.2xlarge  |SUSE Linux        |eu-central-1c   |
|0.1656 |2019-05-24 03:26:23|m5.2xlarge  |Linux/UNIX        |eu-central-1c   |
|0.9046 |2019-05-24 03:25:54|c5n.9xlarge |SUSE Linux        |eu-central-1a   |
|0.8046 |2019-05-24 03:25:54|c5n.9xlarge |Linux/UNIX        |eu-central-1a   |
|23.2913|2019-05-24 03:25:50|x1e.32xlarge|Windows           |eu-central-1a   |
|0.1702 |2019-05-24 03:25:47|c5.xlarge   |SUSE Linux        |eu-central-1c   |
|0.0702 |2019-05-24 03:25:47|c5.xlarge   |Linux/UNIX        |eu-central-1c   |
|0.2326 |2019-05-24 03:16:57|c5n.2xlarge |SUSE Linux        |eu-central-1b   |
|0.1326 |2019-05-24 03:16:57|c5n.2xlarge |Linux/UNIX        |eu-central-1b   |
|2.0056 |2019-05-24 03:09:48|d2.8xlarge  |SUSE Linux

In [78]:
pair_df = create_combination_dictionary(df)
pair_df.show(10, False)

+-------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

(b)

In [79]:
def save_timeseries()

DataFrame[keys: string, collect_list(values): array<struct<_1:double,_2:timestamp>>]