# Spark assignments

In [2]:
from pyspark.sql import SparkSession
import numpy as np
from pyspark.mllib.stat import Statistics
import pandas as pd
import time

In [3]:
spark = SparkSession.builder.getOrCreate()

### 1. given a target column and a dataset as input, compute the correlation between that 
### target column and all the features in the dataset.

In [14]:
df = spark.read.csv('/Users/xue/Desktop/Farrago/Datasets/house price demo/kc_house_data.csv',
                     header=True,
                     inferSchema=True)

In [17]:
def correlation(df, target_col):
    # drop string columns
    columns_to_drop = [item[0] for item in df.dtypes if item[1].startswith('string')]
    df_numeric = df.drop(*columns_to_drop)
    
    # generate correlation matrix
    features = df_numeric.rdd.map(lambda row: row[0:])
    corr_mat=Statistics.corr(features, method="pearson")
    corr_df = pd.DataFrame(corr_mat)
    corr_df.index, corr_df.columns = df_numeric.columns, df_numeric.columns
    corr_df = corr_df[target_col]
    
    return corr_df

In [18]:
start = time.time()
print(correlation(df, 'price'))
end = time.time()
print(end-start)

id              -0.016762
price            1.000000
bedrooms         0.308350
bathrooms        0.525138
sqft_living      0.702035
sqft_lot         0.089661
floors           0.256794
waterfront       0.266369
view             0.397293
condition        0.036362
grade            0.667434
sqft_above       0.605567
sqft_basement    0.323816
yr_built         0.054012
yr_renovated     0.126434
zipcode         -0.053203
lat              0.307003
long             0.021626
sqft_living15    0.585379
sqft_lot15       0.082447
Name: price, dtype: float64
3.3591129779815674


### Assignment 2 
#### This will be a little more challenging. At the time we implement an outlier detection module in order to remove outliers from our data.
#### We never implemented that for Big Data, and it’s now time to do that with Spark.
#### Before implementing anything, let’s do a little bit of research. This is a good starting point:
#### https://towardsdatascience.com/a-brief-overview-of-outlier-detection-techniques-1e0b2c19e561

#### Once you find a nice outlier detection technique, just tell me and we decide which one to implement. If feasible, we can also decide to implement more than one in order to make our future framework richer. 

### Assignment 3
### rotate an image
#### This is more of an experiment. Spark is apparently not well developed for dealing with large datasets of images. I just want you to 

#### apply a simple rotation of 90 degrees to an image by using this module, if possible

In [28]:
from pyspark.ml.image import ImageSchema
img_dir = '/Users/xue/Desktop/1.jpg'

In [29]:
image = spark.read.format("image").load(img_dir)

In [32]:
image.select("image.origin", "image.width", "image.height",'image.nChannels').show(truncate=False)

+-------------------------------+-----+------+---------+
|origin                         |width|height|nChannels|
+-------------------------------+-----+------+---------+
|file:///Users/xue/Desktop/1.jpg|556  |353   |3        |
+-------------------------------+-----+------+---------+



In [25]:
image.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)



In [35]:
image.select('image.Mode')

DataFrame[Mode: int]

In [40]:
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
            .config("spark.jars.packages", "com.microsoft.ml.spark:mmlspark_2.11:0.18.1") \
            .getOrCreate()
import mmlspark

ModuleNotFoundError: No module named 'mmlspark'