In [2]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

## 1. Installation

In [3]:
!pip install pyspark

## 2. Spark Session

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

## 3. Dummy data creation

In [5]:
sampleData = [
    {
        "employee": "James", "department": "Sales", "state": "NY", "salary": 90000, "age": 34, "bonus": 10000
    },
    {
        "employee": "Michael", "department": "Sales", "state": "NY", "salary": 86000, "age": 56, "bonus": 20000
    },
    {
        "employee": "Robert", "department": "Sales", "state": "CA", "salary": 81000, "age": 39, "bonus": 23000
    },
    {
        "employee": "Maria", "department": "Finance", "state": "CA", "salary": 90000, "age": 24, "bonus": 23000
    },
    {
        "employee": "Raman", "department": "Finance", "state": "CA", "salary": 99000, "age": 40, "bonus": 24000
    },
    {
        "employee": "Scott", "department": "Finance", "state": "NY", "salary": 83000, "age": 36, "bonus": 19000
    },
    {
        "employee": "Jen", "department": "Finance", "state": "NY", "salary": 79000, "age": 53, "bonus": 15000
    },
    {
        "employee": "Jeff", "department": "Marketing", "state": "CA", "salary": 80000, "age": 25, "bonus": 18000
    },
    {
        "employee": "Kumar", "department": "Marketing", "state": "NY", "salary": 91000, "age": 50, "bonus": 21000
    }
]


## 4. create Spark DataFrame and show

In [6]:
# to read from a .csv file
# df = spark.read.csv("<file_path>")

df = spark.createDataFrame(sampleData)
df.show()

### show as a pandas df

In [7]:
df.limit(5).toPandas()

### print schema

In [8]:
df.printSchema()

### rename columns

In [9]:
df = df.withColumnRenamed("employee", "employee_name")
df.printSchema()

## rename for all columns
# df = df.toDF(*["age_xy", "bonus_xy", "department_xy", "employee_xy", "salary_xy", "state_xy"])

## select columns

In [10]:
df1 = df.select("employee_name", "age", "state")
df1.show()

## sort

In [11]:
## by default, in ascending order
df.sort('age').show()

In [12]:
## sort in descending order
from pyspark.sql import functions as F

df.sort(F.desc('age')).show()

## create a new column using Spark UDF

In [13]:
# normal python function
def salary_in_k(x):
    return x/1000

# convert the above function to spark UDF and return type of function (Typecast)
from pyspark.sql import types as T

salary_in_k_udf = F.udf(salary_in_k, T.DoubleType())

# creatinfg column
df = df.withColumn('salary_in_k', salary_in_k_udf(F.col('salary')))

df.show()
                   

## Filter

In [14]:
df.filter((df.age >= 50) & (df.salary_in_k >= 80.0)).show()

## groupBy

In [15]:
# groupBy on single column with sum agg
df.groupBy('department').sum('salary').show()

In [16]:
# groupBy on multiple columns
df.groupBy('department', 'state').sum('salary', 'bonus').show()

In [17]:
# running more aggregations at a time
df.groupBy('department').agg(F.sum('salary'), F.avg('salary'),
                            F.min('bonus'), F.max('bonus')).show()

In [18]:
# using filter and alias on the aggreated data
df.groupBy('department').agg(F.sum('salary').alias('sum_salary'), 
                             F.avg('salary').alias('avg_salary'),
                             F.min('bonus').alias('min_bonus'), 
                             F.max('bonus').alias('max_bonus')).where(
                                                                F.col('min_bonus') >= 15000).show()

In [19]:
## collect aggregated list
df.groupBy('department').agg(F.collect_list('state')).alias('state_list').show()

## Joins

In [20]:
df1 = df.select("employee_name", "age", "state", "salary_in_k")
df2 = df.select("salary", "department", "bonus", "employee_name", "age")
df1.show(), df2.show()

In [21]:
df = df1.join(df2, ["employee_name", "age"], how='inner') # how='left'/'right'
df.show()

In [22]:
# when one table is too big and other is too small -> try broadcasting/map side joins
df = df1.join(F.broadcast(df2), ["employee_name", "age"], how='inner')
df.show()

## Use SQL with DataFrame

In [26]:
# register the df to a table
df.registerTempTable('df_table')

from pyspark.sql import SQLContext
sqlContext = SQLContext(spark)

sqlContext.sql('select * from df_table where age >= 50').show()


## Using RDD

In [28]:
import math
from pyspark.sql import Row

def row_function(row):
    # convert row to python dict()
    row_dict = row.asDict()
    
    # add a new key and value
    row_dict['exp_bonus'] = float(np.log(row_dict['bonus']))
    
    # convert dict back to row 
    newRow = Row(**row_dict)
    
    return newRow

# convert df to RDD
df_rdd = df.rdd

# apply above function to RDD
df_rdd_new = df_rdd.map(lambda row: row_function(row))

# convert rdd back to DataFrame
df_new = spark.createDataFrame(df_rdd_new)

df_new.show()

## Using Pandas UDF

In [29]:
df.printSchema()

In [33]:
## get normalized bonus grouped by department

# Declare the schema for the output of our function
outSchema = T.StructType([T.StructField('employee_name', T.StringType(),True),
                        T.StructField('age', T.LongType(),True),
                        T.StructField('state', T.StringType(),True),
                        T.StructField('salary', T.LongType(),True),
                        T.StructField('salary_in_k', T.DoubleType(),True),
                        T.StructField('department', T.StringType(),True),
                        T.StructField('bonus', T.LongType(),True),
                        T.StructField('normalized_bonus', T.DoubleType(),True)
                       ])

# decorate our function with pandas_udf decorator
@F.pandas_udf(outSchema, F.PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.bonus
    v = v - v.mean()
    pdf['normalized_bonus'] = v
    
    return pdf

confirmed_groupwise_normalization = df.groupby("department").apply(subtract_mean)

confirmed_groupwise_normalization.toPandas()

## Mllib

In [34]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer

# category encoding
indexers = [StringIndexer(inputCol=column, outputCol=column+'_idx'
                         ) for column in ["department", "employee_name", "state"]]

# create pieline for indexers
pipeline = Pipeline(stages=indexers)

# fit indexer pipeline on df and transform
indexed = pipeline.fit(df).transform(df)
indexed.show()

In [36]:
## correlation matrix
from pyspark.ml.stat import Correlation

# convert to vector column first
vector_col = 'corr_features'
corr_cols = list(set(indexed.columns) - set(["department", "employee_name", "state"]))

# vector assembler
assembler = VectorAssembler(inputCols=corr_cols, outputCol=vector_col)
indexed_vector = assembler.transform(indexed).select(vector_col)

# get correlation matrix
matrix = Correlation.corr(indexed_vector, vector_col)

result = matrix.collect()[0]['pearson({})'.format(vector_col)].values
result

In [37]:
# corr matrix as a DataFrame
pd.DataFrame(result.reshape(-1, len(corr_cols)), columns=corr_cols, index=corr_cols)