In [1]:
import pyspark

In [2]:
from pyspark import SparkContext

In [3]:
sc = SparkContext()

In [4]:
# Resilient Distributed Dataset. 
nums = sc.parallelize([1,2,3,4])

In [None]:
nums.take(2)

In [None]:
# Applying some transformation with a lambda function to our set
squared = nums.map(lambda x: x*x).collect()

In [None]:
# Using SQLContext
from pyspark.sql import SQLContext
from pyspark.sql import Row
sqlContext = SQLContext(sc)

In [12]:
# But SQLContext it's already deprecated use SparkSession instead
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Session3').getOrCreate()

In [6]:
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]

In [None]:
# Make a parallel RDD collection
rdd = sc.parallelize(list_p)

In [None]:
rdd

In [None]:
# Convert the RDD to tuples
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

In [None]:
ppl

In [None]:
df_ppl = spark.createDataFrame(ppl)

In [None]:
df_ppl

In [None]:
df_ppl.printSchema()

In [None]:
file = "adult_data.csv"
from pyspark import SparkFiles
#sc.addFile(file)

# Add a file to be downloaded with this Spark job on every node
spark.sparkContext.addFile(file)

# sqlContext = SQLContext(sc)

In [None]:
# sqlContext.read.csv

# Load your file to your sql context inside spark session
df = spark.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)

In [None]:
df.printSchema()

In [None]:
df.show(5, truncate = False)

In [213]:
# Import all from `sql.types`
from pyspark.sql.types import *

In [214]:
# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 

In [215]:
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital-gain', 'educational-num', 'capital-loss', 'hours-per-week']

In [216]:
# Convert the type
df_string = convertColumn(df, CONTI_FEATURES, FloatType())

In [217]:
df_string.printSchema()

root
 |-- age: float (nullable = true)
 |-- age-square: double (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: float (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- x: integer (nullable = true)
 |-- capital-gain: float (nullable = true)
 |-- capital-loss: float (nullable = true)
 |-- hours-per-week: float (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [218]:
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol="educational-num", outputCol="educational-num-index")
model = stringIndexer.fit(df)
df = model.transform(df)
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- age-square: double (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- x: integer (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- educational-num-index: double (nullable = false)



In [219]:
df.select(['educational-num','educational-num-index']).show(truncate=False)

+---------------+---------------------+
|educational-num|educational-num-index|
+---------------+---------------------+
|7              |5.0                  |
|9              |0.0                  |
|12             |6.0                  |
|10             |1.0                  |
|10             |1.0                  |
|6              |7.0                  |
|9              |0.0                  |
|15             |9.0                  |
|10             |1.0                  |
|4              |8.0                  |
|9              |0.0                  |
|13             |2.0                  |
|9              |0.0                  |
|9              |0.0                  |
|9              |0.0                  |
|14             |3.0                  |
|10             |1.0                  |
|9              |0.0                  |
|9              |0.0                  |
|16             |12.0                 |
+---------------+---------------------+
only showing top 20 rows



In [220]:

df.groupBy("education").count().sort("count",ascending=True).show()

+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   83|
|     1st-4th|  247|
|     5th-6th|  509|
|   Doctorate|  594|
|        12th|  657|
|         9th|  756|
| Prof-school|  834|
|     7th-8th|  955|
|        10th| 1389|
|  Assoc-acdm| 1601|
|        11th| 1812|
|   Assoc-voc| 2061|
|     Masters| 2657|
|   Bachelors| 8025|
|Some-college|10878|
|     HS-grad|15784|
+------------+-----+



In [221]:

df.describe().show()

+-------+------------------+------------------+-----------+------------------+------------+------------------+--------------+----------------+------------+------------------+------------------+------------------+-----------------+------------------+--------------+------+---------------------+
|summary|               age|        age-square|  workclass|            fnlwgt|   education|   educational-num|marital-status|      occupation|relationship|              race|                 x|      capital-gain|     capital-loss|    hours-per-week|native-country|income|educational-num-index|
+-------+------------------+------------------+-----------+------------------+------------+------------------+--------------+----------------+------------+------------------+------------------+------------------+-----------------+------------------+--------------+------+---------------------+
|  count|             48842|             48842|      48842|             48842|       48842|             48842|        

In [222]:

df.describe('capital-gain').show()

+-------+------------------+
|summary|      capital-gain|
+-------+------------------+
|  count|             48842|
|   mean|1079.0676262233324|
| stddev| 7452.019057655413|
|    min|                 0|
|    max|             99999|
+-------+------------------+



In [223]:
df.select('income').show()

+------+
|income|
+------+
| <=50K|
| <=50K|
|  >50K|
|  >50K|
| <=50K|
| <=50K|
| <=50K|
|  >50K|
| <=50K|
| <=50K|
|  >50K|
| <=50K|
| <=50K|
| <=50K|
|  >50K|
|  >50K|
| <=50K|
| <=50K|
| <=50K|
|  >50K|
+------+
only showing top 20 rows



In [224]:
# If we want to know how the age get related to the income
df.crosstab('age','income').sort('age_income').show()

+----------+-----+----+
|age_income|<=50K|>50K|
+----------+-----+----+
|        17|  595|   0|
|        18|  862|   0|
|        19| 1050|   3|
|        20| 1112|   1|
|        21| 1090|   6|
|        22| 1161|  17|
|        23| 1307|  22|
|        24| 1162|  44|
|        25| 1119|  76|
|        26| 1068|  85|
|        27| 1117| 115|
|        28| 1101| 179|
|        29| 1025| 198|
|        30| 1031| 247|
|        31| 1050| 275|
|        32|  957| 296|
|        33| 1045| 290|
|        34|  949| 354|
|        35|  997| 340|
|        36|  948| 400|
+----------+-----+----+
only showing top 20 rows



In [225]:
# Drop a column
df.drop('educational-num').columns

['age',
 'age-square',
 'workclass',
 'fnlwgt',
 'education',
 'marital-status',
 'occupation',
 'relationship',
 'race',
 'x',
 'capital-gain',
 'capital-loss',
 'hours-per-week',
 'native-country',
 'income',
 'educational-num-index']

In [226]:
# Filter by one column as a paremeter
df.filter(df.age > 40).count()

20211

In [227]:
# Filter by TWO columns as a paremeter
df.filter((df.age > 40) & (df.income == '>50K')).count()

7086

In [228]:
# Aggregate data by group
df.groupby('marital-status').agg({'capital-gain': 'mean'}).show()

+--------------------+------------------+
|      marital-status| avg(capital-gain)|
+--------------------+------------------+
|           Separated| 581.8424836601307|
|       Never-married|  384.382639449029|
|Married-spouse-ab...| 629.0047770700637|
|            Divorced| 793.6755615860094|
|             Widowed| 603.6442687747035|
|   Married-AF-spouse|2971.6216216216217|
|  Married-civ-spouse|1739.7006121810625|
+--------------------+------------------+



In [229]:
df.groupby('marital-status').agg({'*': 'count'}).show()

+--------------------+--------+
|      marital-status|count(1)|
+--------------------+--------+
|           Separated|    1530|
|       Never-married|   16117|
|Married-spouse-ab...|     628|
|            Divorced|    6633|
|             Widowed|    1518|
|   Married-AF-spouse|      37|
|  Married-civ-spouse|   22379|
+--------------------+--------+



In [230]:
from pyspark.sql import functions as F
df.groupby('marital-status').agg(F.min(df.age)).show()

+--------------------+--------+
|      marital-status|min(age)|
+--------------------+--------+
|           Separated|      18|
|       Never-married|      17|
|Married-spouse-ab...|      17|
|            Divorced|      18|
|             Widowed|      17|
|   Married-AF-spouse|      19|
|  Married-civ-spouse|      17|
+--------------------+--------+



In [231]:
df.select('marital-status','age').filter((df['marital-status'] == 'Never-married') & (df.age < 18) ).show()

+--------------+---+
|marital-status|age|
+--------------+---+
| Never-married| 17|
| Never-married| 17|
| Never-married| 17|
| Never-married| 17|
| Never-married| 17|
| Never-married| 17|
| Never-married| 17|
| Never-married| 17|
| Never-married| 17|
| Never-married| 17|
| Never-married| 17|
| Never-married| 17|
| Never-married| 17|
| Never-married| 17|
| Never-married| 17|
| Never-married| 17|
| Never-married| 17|
| Never-married| 17|
| Never-married| 17|
| Never-married| 17|
+--------------+---+
only showing top 20 rows



In [232]:
# Apply the transformation and add it to the DataFrame
df = df.withColumn("age-square", F.col("age") ** 2)

In [233]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- age-square: double (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- x: integer (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- educational-num-index: double (nullable = false)



In [234]:
# Change the order of the displayed columns
COLUMNS = ['age', 'age-square', 'workclass', 'fnlwgt', 'education', 'educational-num', 'marital-status',
           'occupation', 'relationship', 'race', 'x', 'capital-gain', 'capital-loss',
           'hours-per-week', 'native-country', 'income']
df = df.select(COLUMNS)
df.first()

Row(age=25, age-square=625.0, workclass='Private', fnlwgt=226802, education='11th', educational-num=7, marital-status='Never-married', occupation='Machine-op-inspct', relationship='Own-child', race='Black', x=1, capital-gain=0, capital-loss=0, hours-per-week=40, native-country='United-States', income='<=50K')

In [235]:
# Remove the netherlands 
df_remove = df.filter(df['native-country'] != 'Holand-Netherlands')

In [236]:
df.dtypes

[('age', 'int'),
 ('age-square', 'double'),
 ('workclass', 'string'),
 ('fnlwgt', 'int'),
 ('education', 'string'),
 ('educational-num', 'int'),
 ('marital-status', 'string'),
 ('occupation', 'string'),
 ('relationship', 'string'),
 ('race', 'string'),
 ('x', 'int'),
 ('capital-gain', 'int'),
 ('capital-loss', 'int'),
 ('hours-per-week', 'int'),
 ('native-country', 'string'),
 ('income', 'string')]

In [237]:
# Load the test2.csv
df_pyspark=spark.read.csv('test2.csv',header=True,inferSchema=True)


In [238]:
df_pyspark.show()

+-------+----+----------+------+
|   Name| age|Experience|Salary|
+-------+----+----------+------+
|   Karl|  31|        10| 30000|
| Sussie|  30|         8| 25000|
|  Sunny|  29|         4| 20000|
|   Paul|  24|         3| 20000|
|  Henry|  21|         1| 15000|
|Shutter|  23|         2| 18000|
|  Manny|null|      null| 40000|
|   null|  34|        10| 38000|
|   null|  36|      null|  null|
+-------+----+----------+------+



In [239]:
# Drop all the rows that contains a null value
df_pyspark.na.drop().show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|   Karl| 31|        10| 30000|
| Sussie| 30|         8| 25000|
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
|  Henry| 21|         1| 15000|
|Shutter| 23|         2| 18000|
+-------+---+----------+------+



In [240]:
# The same but for ANY value
df_pyspark.na.drop(how="any").show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|   Karl| 31|        10| 30000|
| Sussie| 30|         8| 25000|
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
|  Henry| 21|         1| 15000|
|Shutter| 23|         2| 18000|
+-------+---+----------+------+



In [241]:
# Threshold: Preserve if the row has 3 or more attributes with a NOT_NULL VALUE
df_pyspark.na.drop(how="any",thresh=3).show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|   Karl| 31|        10| 30000|
| Sussie| 30|         8| 25000|
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
|  Henry| 21|         1| 15000|
|Shutter| 23|         2| 18000|
|   null| 34|        10| 38000|
+-------+---+----------+------+



In [242]:
# Apply only to the column age
df_pyspark.na.drop(how="any",subset=['Age']).show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|   Karl| 31|        10| 30000|
| Sussie| 30|         8| 25000|
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
|  Henry| 21|         1| 15000|
|Shutter| 23|         2| 18000|
|   null| 34|        10| 38000|
|   null| 36|      null|  null|
+-------+---+----------+------+



In [243]:
# Filling the Missing Value
df_pyspark.na.fill(0,['Experience','age']).show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|   Karl| 31|        10| 30000|
| Sussie| 30|         8| 25000|
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
|  Henry| 21|         1| 15000|
|Shutter| 23|         2| 18000|
|  Manny|  0|         0| 40000|
|   null| 34|        10| 38000|
|   null| 36|         0|  null|
+-------+---+----------+------+



In [244]:
from pyspark.ml.feature import Imputer

# Define imputer columns and output columns
imputer = Imputer(
    inputCols = ['age', 'Experience', 'Salary'], 
    outputCols = [
        "{}_imputed".format(c) for c in ['age', 'Experience', 'Salary']
    ]).setStrategy("median")

# Add imputation cols to df
imputer.fit(df_pyspark).transform(df_pyspark).show()

+-------+----+----------+------+-----------+------------------+--------------+
|   Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+-------+----+----------+------+-----------+------------------+--------------+
|   Karl|  31|        10| 30000|         31|                10|         30000|
| Sussie|  30|         8| 25000|         30|                 8|         25000|
|  Sunny|  29|         4| 20000|         29|                 4|         20000|
|   Paul|  24|         3| 20000|         24|                 3|         20000|
|  Henry|  21|         1| 15000|         21|                 1|         15000|
|Shutter|  23|         2| 18000|         23|                 2|         18000|
|  Manny|null|      null| 40000|         29|                 4|         40000|
|   null|  34|        10| 38000|         34|                10|         38000|
|   null|  36|      null|  null|         36|                 4|         20000|
+-------+----+----------+------+-----------+--------

In [13]:
# Read The dataset
training = spark.read.csv('test1.csv',header=True,inferSchema=True)

In [246]:
training.show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|   Karl| 31|        10| 30000|
| Sussie| 30|         8| 25000|
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
|  Henry| 21|         1| 15000|
|Shutter| 23|         2| 18000|
+-------+---+----------+------+



In [247]:
# [Age,Experience]--> new feature --> independent feature

In [14]:
from pyspark.ml.feature import VectorAssembler

# Make the vector of the new feature
featureassembler = VectorAssembler(
    inputCols = ["age","Experience"],
    outputCol = "Independent Features"
)

In [15]:
# Transform yout current dataframe adding the new feature
output = featureassembler.transform(training)
output.show()

+-------+---+----------+------+--------------------+
|   Name|age|Experience|Salary|Independent Features|
+-------+---+----------+------+--------------------+
|   Karl| 31|        10| 30000|         [31.0,10.0]|
| Sussie| 30|         8| 25000|          [30.0,8.0]|
|  Sunny| 29|         4| 20000|          [29.0,4.0]|
|   Paul| 24|         3| 20000|          [24.0,3.0]|
|  Henry| 21|         1| 15000|          [21.0,1.0]|
|Shutter| 23|         2| 18000|          [23.0,2.0]|
+-------+---+----------+------+--------------------+



In [16]:
# Select the new feature and the independent feature that we need to predict
finalized_data = output.select("Independent Features","Salary")
finalized_data.show()

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 20000|
|          [24.0,3.0]| 20000|
|          [21.0,1.0]| 15000|
|          [23.0,2.0]| 18000|
+--------------------+------+



In [17]:
from pyspark.ml.regression import LinearRegression

# Divide our data in data to train the model and data to predict
train_data,test_data = finalized_data.randomSplit([0.75,0.25])
test_data.show()

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|          [23.0,2.0]| 18000|
|          [30.0,8.0]| 25000|
|         [31.0,10.0]| 30000|
+--------------------+------+



In [18]:
# Create a new regressor in order to input the features and the dependant feature
regressor = LinearRegression(
    featuresCol = 'Independent Features', 
    labelCol = 'Salary'
)

In [19]:
# Training our model
regressor = regressor.fit(train_data)

In [20]:
# Coefficients
regressor.coefficients

DenseVector([-714.2857, 3571.4286])

In [21]:
# Intercepts
regressor.intercept

26428.57142857082

In [22]:
# Prediction
pred_results = regressor.evaluate(test_data)

In [23]:
pred_results.predictions.show()



+--------------------+------+-----------------+
|Independent Features|Salary|       prediction|
+--------------------+------+-----------------+
|          [23.0,2.0]| 18000|17142.85714285713|
|          [30.0,8.0]| 25000| 33571.4285714283|
|         [31.0,10.0]| 30000|39999.99999999959|
+--------------------+------+-----------------+



In [25]:
pred_results.meanAbsoluteError,pred_results.meanSquaredError

(6476.190476190255, 58068027.21088012)