In [1]:
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace,rand,col
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DoubleType, TimestampType
import matplotlib.pyplot as plt
import random
import pandas as pd
import inspect

# Task 8

## Exercise 1

The bionomial gives the coefficients for a polynomial expension. To see this look at the following example:
$$(1+x)^4=\binom{4}{0}x^01^4+\binom{4}{1}x^11^3+\binom{4}{2}x^21^2+\binom{4}{3}x^31^1+\binom{4}{4}x^41^0$$

Applying this to our the task at hand gives us:
$$2^N =(1+1)^N=\sum_{k=0}^{N}\binom{N}{k}1^{n-k}1^{k}=\sum_{k=0}^{N}\binom{N}{k}*1=\sum_{k=0}^{N}\binom{N}{k}$$

## Execise 2

### a)

All numbers which appear in at least 5 baskets. Which means 5 * number must be smaller or equal to 100. Hence,
the numbers from 1 to 20 are frequent.

### b)

All paris of numbers (x,y) for which x * y is smaller 20.
(4,5) -> 20,40,60,80,100 [included]
(3,7) -> 21,42,63,84 [excluded]

### c)

1 is included in 100 baskets, 2 is included in 100/2=50 baskets, 3 is included in 100/3 = floor(33) baskets.
Hence, the total sum can be expressed by:
$$sumBasketSizes = \sum_{k=1}^{100} \lfloor \frac{100}{k} \rfloor$$

### d)

The confidence of a rule is defined as follows:
$\frac{support(I\cup J)}{support(I)}$

So for $R_1$:
$support(I) = 2$
$support(I\cup J)= 1$
Hence the confidence is 0.5

So for $R_2$:
$support(I)= 8$
$support(I\cup J)= 1$
Hence the confidence is 0.125

## Execise 3

After 1st pass:
C_1={{1},{2},{3},{4},{5},{6},{7},{8},{9},{10},{11},{12},{13},{14},{15},{16},{17},{18},{19},{20}}
non frequents (21-100)

After 2nd pass:
C_2={{1,2}, {1,3}, {1,4}, {1,5}, {1,6}, {1,7}, {1,8}, {1,9}, {1,10}, {1,11}, {1,12}, {1,13}, {1,14}, {1,15}, {1,16}, {1,17}, {1,18}, {1,19}, {1,20}
       {2,3}, {2,4}, {2,5}, {2,6}, {2,8}, {2,9}, {2,10},
       {3,4}, {3,5}, {3,6},
       {4,5}}

After 3rd pass:
C_2={{1,2,3}, {1,2,4}, {1,2,5}, {1,2,6},
       {1,2,7}, {1,2,8}, {1,2,9}, {1,2,10},
      {1,3,4}, {1,3,5}, {1,3,6},
      {1,4,5}

## Exercise 4

### a)

The matrix of pairs has overall $I^2$ entries, but we only need to store one half hence we have $I^2/2$.
But we also do not need to store any diagonal entries since there are $I$ diagonal entries we get.
$\frac{I^2-I}{2}$ multiplied with 4 bytes we get: $2I^2-2I$ bytes of storage

### b)

This is exactly what we calculated before hence the max number is: $\frac{I^2-I}{2}$


### c)

If less than 1/3 of the possible triples occur we will save storage, as for each frequent pair we need 3 times the storage.



## Exercise 5

First we create a context and session to work with

In [2]:
spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
sc = spark.sparkContext

22/12/17 15:56:51 WARN Utils: Your hostname, jakob-ThinkPad-E15-Gen-4 resolves to a loopback address: 127.0.1.1; using 192.168.111.224 instead (on interface wlp3s0)
22/12/17 15:56:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/17 15:56:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Next we want to define our subroutine for reading in a file

In [3]:
def read_in_ec2_file(path):
    df = spark.read.csv(path, sep='\t', header=True)
    df = df.drop("Type")
    df = df.withColumn("Price",df.Price.cast(DoubleType()))
    df = df.withColumn("Timestamp",df.Timestamp.cast(TimestampType()))
    return df

Lastly we want to test our implementation on an example file:

In [18]:
print("Reading in the file")
test_file_name = 'prices-eu-central-1-2019-05-24.txt.gz'
file_path = 'data_sheet8/'
full_test_path = file_path + test_file_name

print("Printing its schema")
datafram_read = read_in_ec2_file(full_test_path)
datafram_read.printSchema()

print("Performing some benchmark")
print("First we determine all unique pairs")
unique_pairs = datafram_read[['InstanceType', 'ProductDescription']].drop_duplicates().collect()
for pair in unique_pairs:
    subset = datafram_read.filter((datafram_read.InstanceType==pair[0]) & (datafram_read.ProductDescription==pair[1])).agg({"Price":"avg"})
    print("For the pair: ("+pair[0]+" - "+pair[1]+") with an average of: "+str(subset.first()[0]))

Reading in the file
Printing its schema
root
 |-- Price: double (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 |-- InstanceType: string (nullable = true)
 |-- ProductDescription: string (nullable = true)
 |-- AvailabilityZone: string (nullable = true)

Performing some benchmark
First we determine all unique pairs
For the pair: (r3.2xlarge - Linux/UNIX) with an average of: 0.146915137614679
