# ISA 414 - Managing Big Data
## Lecture 24 – Spark (Part II)

#### Slide 20

In [None]:
# File location and type
file_location = "/FileStore/tables/noshow.csv"

# inferSchema := detect data types
# header := whether first row contains column names
raw_data = spark.read.load(file_location, format="csv", inferSchema="true", header="true")

# We keep raw_data intact because we will use it later
df = raw_data

# Show the top 20 rows
df.show()

#### Slide 21

In [None]:
# types
df.dtypes

In [None]:
df.select("Status").groupBy("Status").count().show()

#### Slide 22

In [None]:
df = df.replace(["Monday","Tuesday","Wednesday","Thursday","Friday"], "WeekDay", "DayOfTheWeek")
df = df.replace(["Saturday","Sunday"], "WeekEnd", "DayOfTheWeek")

In [None]:
df.select("DayOfTheWeek").groupBy("DayOfTheWeek").count().show()

#### Slide 23

In [None]:
df.select("Age").summary().show()

In [None]:
df = df.filter(df["age"] >=0 )

#### Slide 24

In [None]:
from pyspark.ml.feature import StringIndexer

stringToIndex = StringIndexer(inputCol = 'Gender', outputCol = 'GenderIndex')
df = stringToIndex.fit(df).transform(df)

stringToIndex = StringIndexer(inputCol = 'DayOfTheWeek', outputCol = 'DayOfTheWeekIndex')
df = stringToIndex.fit(df).transform(df)

# note that StatusIndex is now our target
stringToIndex = StringIndexer(inputCol = 'Status', outputCol = 'StatusIndex')
df = stringToIndex.fit(df).transform(df)

# showing the old and transformed variables
df.select(["Gender", "GenderIndex", "DayOfTheWeek", "DayOfTheWeekIndex", "Status", "StatusIndex"]).show()

#### Slide 25

In [None]:
from pyspark.ml.feature import VectorAssembler

# note how the date variables are not part of our model
predictors = ['Age', 'GenderIndex', 'DayOfTheWeekIndex', 'Diabetes', 'Alcoolism', "HiperTension",
              "Handcap", "Smokes", "Scholarship", "Tuberculosis",  "Sms_Reminder", "AwaitingTime"]
assembler = VectorAssembler(inputCols=predictors, outputCol="predictors")
df = assembler.transform(df)

df.select("predictors").show(truncate=False)

#### Slide 26

In [None]:
training_set, test_set = df.select(["StatusIndex", "predictors"]).randomSplit([0.75, 0.25])

In [None]:
from pyspark.ml.classification import RandomForestClassifier

model = RandomForestClassifier(numTrees=100, featuresCol= "predictors", labelCol='StatusIndex')
model = model.fit(training_set)

#### Slide 27

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

predictions = model.transform(test_set)
predictions.select("StatusIndex", "prediction").show(10)

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="StatusIndex", predictionCol="prediction", metricName = "accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %s" % (accuracy))

#### Slide 29

In [None]:
# note we are using raw_data here
raw_data.createOrReplaceTempView("TABLE")

In [None]:
spark.sql("SELECT AGE FROM TABLE").show()


In [None]:
spark.sql("SELECT Status, COUNT(Status) FROM TABLE GROUP BY Status").show()


In [None]:
spark.sql("SELECT * FROM TABLE WHERE AGE < 0").show()
