# Spark Intro for DAT 513

based of off

Chapter 3 "Learning Spark, 2nd Edition",  Damji, Lee, Wenig, Das, from O'Reilly 

In [2]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [3]:
#install Apache Spark
!pip install pyspark --quiet

In [4]:
#Apache Spark Libraries
import pyspark
from pyspark.sql import SparkSession

#Apache Spark ML CLassifier Libraries
from pyspark.ml.classification import DecisionTreeClassifier,RandomForestClassifier,NaiveBayes

#Apache Spark Evaluation Library
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#Apache Spark Features libraries
from pyspark.ml.feature import StandardScaler,StringIndexer, VectorAssembler, VectorIndexer, OneHotEncoder

#Apache Spark Pipelin Library
from pyspark.ml import Pipeline

# Apache Spark `DenseVector`
from pyspark.ml.linalg import DenseVector

#Data Split Libraries
import sklearn
from sklearn.model_selection import train_test_split


#Tabulating Data
from tabulate import tabulate

#Garbage
import gc

In [5]:
#Building Spark Session
spark = (SparkSession.builder
                  .appName('Author_ages')
                  .config("spark.executor.memory", "1G")
                  .config("spark.executor.cores","4")
                  .getOrCreate())

Import the functions to handle the avg() calculation

In [6]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [7]:
# Create a DataFrame 
data_df = spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30), 
  ("TD", 35), ("Brooke", 25)], ["name", "age"])
# Group the same names together, aggregate their ages, and compute an average
avg_df = data_df.groupBy("name").agg(avg("age"))
# Show the results of the final execution
avg_df.show()
#print("avg: "+data_df.select(avg("age")).collect(0)(0))

Python works with schema,  much like databases do, to specify how data is stored

When loading data or setting up a DataFrame,  we can allow Spark to do this, or specify the schema

Data types are much like Python data types with slight differences,   see 

"Learning Spark, 2nd Edition" by Damji, Lee, Wenig and Das,  from O'Reilly,  chapter 3


In [8]:
# In Python
schema = "author STRING, title STRING, pages INT"

In [9]:
# Define schema for our data using DDL 
schema = "`Id` INT, `First` STRING, `Last` STRING, `Url` STRING, `Published` STRING, `Hits` INT, `Campaigns` ARRAY<STRING>"

# Create our static data, as a list of lists in Python
data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter",
"LinkedIn"]],
       [2, "Brooke","Wenig", "https://tinyurl.2", "5/5/2018", 8908, ["twitter",
"LinkedIn"]],
       [3, "Denny", "Lee", "https://tinyurl.3", "6/7/2019", 7659, ["web",
"twitter", "FB", "LinkedIn"]],
       [4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568, 
["twitter", "FB"]],
       [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web",
"twitter", "FB", "LinkedIn"]],
       [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568, 
["twitter", "LinkedIn"]]
      ]

# convert to a Spark DataFrame

# Create a DataFrame using the schema defined above
blogs_df = spark.createDataFrame(data, schema)

# Show the DataFrame; it should reflect our table above
blogs_df.show()
# Print the schema used by Spark to process the DataFrame
print(blogs_df.printSchema())

In [10]:
blogs_df.schema

In [None]:
Many operations on Spark data frames appear to match operations in Python

In [11]:
blogs_df.columns

The dir function seems to work the same way, showing us the variables and member functions

In [12]:
dir(blogs_df)

In [13]:
blogs_df.dtypes

Row objects in Spark

In [14]:
from pyspark.sql import Row
blog_row = Row(6, "Reynold", "Xin", "https://tinyurl.6", 255568, "3/2/2015", 
  ["twitter", "LinkedIn"])
# access using index for individual items
blog_row[1]


In [15]:
spark.version

# Adding at data set, the Lyons-housing set again

Go to Data on the Kaggle utilities (upper right),  go to "add data",   search for Lyons, and add the Lyons set

In [17]:
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

In Spark

a "Projection" is a way to return only the rows matching a relational criterion,  the term "Projection" is from Relational Database terminology
     we may call this a "row selection, or a row slice" in Python or R
     
Projections are done in Spark using the select() method - basically as you would in SQL

the conditions (or "filter") are specified in a filter() or where() method     --again, the where acts like SQL

In [16]:
url = '/kaggle/input/lyon-housing/lyon_housing.csv'

data = spark.read.format("csv") \
       .option("header", "true") \
       .option("inferSchema","true")\
       .load(url) 

data.cache() #for faster re-use

In [17]:
data.show(5)

In [21]:
maison_df=(data.select("surface_logement","nombre_parkings","prix").where(col("type_bien")=="maison"))
maison_df.show(5)

In [None]:
Calucluate the number of distinct neighborhoods

In [22]:
(data
  .select("commune")
  .where(col("commune").isNotNull())
  .agg(countDistinct("commune").alias("DistinctNeighborHoods"))
  .show())

In [23]:
(data
  .select("commune")
  .where(col("commune").isNotNull())
  .distinct()
  .show())

In [27]:
new_lyon=data.withColumnRenamed("commune","Neibhorhood")

new_lyon.show(5)

(new_lyon.select('Neibhorhood','prix').where(col('prix')>700000).show(10))

In [38]:
lyon_date_df = (data
  .withColumn("Trans_date", to_timestamp(col("date_transaction"), "yyyy-mm-dd"))
  .drop("date_transaction")) 
  
                
 
lyon_date_df.show(5)
    
#elect the converted columns
(lyon_date_df.select("Trans_Date","type_bien","prix").show(5, False))

In [36]:
lyon_date_df.show(5)

With the date set, we can order the data by year, month and price

In [44]:
(lyon_date_df
  .select(year('Trans_date'),month('Trans_date'),"prix")
  .distinct()
  .orderBy(year('Trans_date'),month('Trans_date'),col("prix"))
  .show(10))

In [45]:
(lyon_date_df
  .select("commune")
  .where(col("commune").isNotNull())
  .groupBy("commune")
  .count()
  .orderBy("count", ascending=False)
  .show(n=10, truncate=False))

In [46]:
import pyspark.sql.functions as F
(lyon_date_df
  .select(F.avg("prix"), F.avg("surface_logement"),
    F.min("prix"), F.max("prix"))
  .show())