### Column Operations
- Adding column
- Renaming column
- Remove column

In [None]:
#Imports and load data
from pyspark.sql import SparkSession
from pyspark.sql import Window as W
from pyspark.sql.functions import * # Needed for Filters like When, Like etc.

spark = SparkSession.builder.appName("penguins").master("local[4]").getOrCreate()
df_penguins = spark.read.csv("data/penguins.csv",header=True,inferSchema=True)
spark.sparkContext.setLogLevel("ERROR")

In [None]:
df_penguins.describe().show()

In [None]:
#Grouping using groupBy
df_penguins.groupBy("sex").count().show()

In [None]:
#Aggregate on max
df_penguins.groupBy("sex").agg(max("bill_length_mm").alias("max_BL")).show()

#Can do the same for min, mean, sum

In [None]:
#Collecting a set per group
df_penguins.groupBy("island").agg(collect_set("species").alias("set")).show()

In [None]:
#Group by percentile on sex using expressions
df_penguins.groupby('sex').agg(expr('percentile(body_mass_g, array(0.25))')[0].alias('%25'),
                             expr('percentile(body_mass_g, array(0.50))')[0].alias('%50'),
                             expr('percentile(body_mass_g, array(0.75))')[0].alias('%75')).show()

In [None]:
df_penguins.filter(isnull(df_penguins.sex)).show()

In [None]:
#Windowing on Dataframes to add a index
window = W.partitionBy("island","species").orderBy("bill_length_mm")
df_penguins = df_penguins.withColumn("row",row_number().over(window))
df_penguins.show(6)


In [None]:
values = [("a", 23), ("b", 45), ("c", 10), ("d", 60), ("e", 56), ("f", 2), ("g", 25), ("h", 40), ("j", 33)]


df = spark.createDataFrame(values, ["name", "ages"])


from pyspark.ml.feature import Bucketizer
bucketizer = Bucketizer(splits=[ 0, 6, 18, 60, float('Inf') ],inputCol="ages", outputCol="buckets")
df_buck = bucketizer.setHandleInvalid("keep").transform(df)

df_buck.show()


In [None]:
import numpy as np
import pandas as pd
import pyspark
import os
import urllib
import sys

from pyspark.sql.functions import *
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.ml.feature import *

# start Spark session
spark = pyspark.sql.SparkSession.builder.appName('Iris').getOrCreate()


# print runtime versions
print ('****************')
print ('Python version: {}'.format(sys.version))
print ('Spark version: {}'.format(spark.version))
print ('****************')

# load iris.csv into Spark dataframe
df_penguins = spark.read.csv("data/penguins.csv", header=True, inferSchema=True)
data = df_penguins.select("bill_length_mm","bill_depth_mm","flipper_length_mm","body_mass_g","species")
data.show(10)

# vectorize all numerical columns into a single feature column
feature_cols = data.columns[:-1]
assembler = pyspark.ml.feature.VectorAssembler(inputCols=feature_cols, outputCol='features')
data = assembler.transform(data)

# convert text labels into indices
data = data.select(['features', 'species'])
label_indexer = pyspark.ml.feature.StringIndexer(inputCol='species', outputCol='label').fit(data)
data = label_indexer.transform(data)
data.show()

# only select the features and label column
tmp = data.select(['features', 'label'])
print("Reading for machine learning")

# use Logistic Regression to train on the training set
train, test = data.randomSplit([0.70, 0.30])
lr = pyspark.ml.classification.LogisticRegression(regParam=reg)
model = lr.fit(train)

# predict on the test set
prediction = model.transform(test)
print("Prediction")
prediction.show(10)

# evaluate the accuracy of the model using the test set
evaluator = pyspark.ml.evaluation.MulticlassClassificationEvaluator(metricName='accuracy')
accuracy = evaluator.evaluate(prediction)

print()
print('#####################################')
print('Regularization rate is {}'.format(reg))
print("Accuracy is {}".format(accuracy))
print('#####################################')
print()

print('Accuracy', accuracy)