# 1 - Basic operations with dataframes

In [19]:
!pip install pyspark



In [113]:
from pyspark.sql import SparkSession

In [114]:
spark = SparkSession.builder.appName('Dataframe').getOrCreate()

In [115]:
spark

### Read the dataset

In [37]:
df = spark.read.csv('test1.csv', inferSchema=True, header=True)
df.show()

+------+---+----------+
|  Name|Age|Experience|
+------+---+----------+
| Julia| 20|       0.0|
|Marcin| 22|       1.0|
|Janusz| 30|      10.0|
+------+---+----------+



### Check the schema

In [38]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: double (nullable = true)



### Reading rows and columns of dataframe

In [39]:
df.columns

['Name', 'Age', 'Experience']

In [40]:
df.head(2)

[Row(Name='Julia', Age=20, Experience=0.0),
 Row(Name='Marcin', Age=22, Experience=1.0)]

In [42]:
df.select('Name').show()

+------+
|  Name|
+------+
| Julia|
|Marcin|
|Janusz|
+------+



In [43]:
df.select(['Name', 'Experience']).show()

+------+----------+
|  Name|Experience|
+------+----------+
| Julia|       0.0|
|Marcin|       1.0|
|Janusz|      10.0|
+------+----------+



In [44]:
df.dtypes

[('Name', 'string'), ('Age', 'int'), ('Experience', 'double')]

In [45]:
df.describe().show()

+-------+------+-----------------+------------------+
|summary|  Name|              Age|        Experience|
+-------+------+-----------------+------------------+
|  count|     3|                3|                 3|
|   mean|  null|             24.0|3.6666666666666665|
| stddev|  null|5.291502622129181| 5.507570547286102|
|    min|Janusz|               20|               0.0|
|    max|Marcin|               30|              10.0|
+-------+------+-----------------+------------------+



### Adding columns in dataframe

In [48]:
df = df.withColumn('Eperience after 2 years', df['Experience'] + 2)
df.show()

+------+---+----------+-----------------------+
|  Name|Age|Experience|Eperience after 2 years|
+------+---+----------+-----------------------+
| Julia| 20|       0.0|                    2.0|
|Marcin| 22|       1.0|                    3.0|
|Janusz| 30|      10.0|                   12.0|
+------+---+----------+-----------------------+



### Dropping columns in dataframe

In [50]:
df = df.drop('Eperience after 2 years')
df.show()

+------+---+----------+
|  Name|Age|Experience|
+------+---+----------+
| Julia| 20|       0.0|
|Marcin| 22|       1.0|
|Janusz| 30|      10.0|
+------+---+----------+



### Renaming columns

In [51]:
df = df.withColumnRenamed('Name', 'New Name')
df.show()

+--------+---+----------+
|New Name|Age|Experience|
+--------+---+----------+
|   Julia| 20|       0.0|
|  Marcin| 22|       1.0|
|  Janusz| 30|      10.0|
+--------+---+----------+



# 2 - Handling missing values

In [54]:
df = spark.read.csv('./test2.csv', header=True, inferSchema=True)
df.show()

+------+----+----------+--------+
|  Name| Age|Experience|  Salary|
+------+----+----------+--------+
| Julia|  20|         0|    null|
|Marcin|  22|         1| 12000.0|
|Janusz|  30|        10| 10000.0|
|Maciej|  29|         9|  5000.0|
|Lukasz|null|        32|100000.0|
|   Ala|  25|      null|  4000.0|
+------+----+----------+--------+



### Dropping the rows with null values

In [56]:
df.na.drop().show()

+------+---+----------+-------+
|  Name|Age|Experience| Salary|
+------+---+----------+-------+
|Marcin| 22|         1|12000.0|
|Janusz| 30|        10|10000.0|
|Maciej| 29|         9| 5000.0|
+------+---+----------+-------+



### Treshhold

In [59]:
df.na.drop(thresh=3).show()

+------+----+----------+--------+
|  Name| Age|Experience|  Salary|
+------+----+----------+--------+
| Julia|  20|         0|    null|
|Marcin|  22|         1| 12000.0|
|Janusz|  30|        10| 10000.0|
|Maciej|  29|         9|  5000.0|
|Lukasz|null|        32|100000.0|
|   Ala|  25|      null|  4000.0|
+------+----+----------+--------+



In [61]:
df.na.drop(thresh=4).show()

+------+---+----------+-------+
|  Name|Age|Experience| Salary|
+------+---+----------+-------+
|Marcin| 22|         1|12000.0|
|Janusz| 30|        10|10000.0|
|Maciej| 29|         9| 5000.0|
+------+---+----------+-------+



### Filling the missing values

In [66]:
df.na.fill(-1, ['Experience', 'Age']).show()

+------+---+----------+--------+
|  Name|Age|Experience|  Salary|
+------+---+----------+--------+
| Julia| 20|         0|    null|
|Marcin| 22|         1| 12000.0|
|Janusz| 30|        10| 10000.0|
|Maciej| 29|         9|  5000.0|
|Lukasz| -1|        32|100000.0|
|   Ala| 25|        -1|  4000.0|
+------+---+----------+--------+



In [68]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['Age', 'Experience', 'Salary'],
    outputCols=['{}_imputed'.format(c) for c in ['Age', 'Experience', 'Salary']]
).setStrategy('mean')

### Add imputation cols to df

In [69]:
imputer.fit(df).transform(df).show()

+------+----+----------+--------+-----------+------------------+--------------+
|  Name| Age|Experience|  Salary|Age_imputed|Experience_imputed|Salary_imputed|
+------+----+----------+--------+-----------+------------------+--------------+
| Julia|  20|         0|    null|         20|                 0|       26200.0|
|Marcin|  22|         1| 12000.0|         22|                 1|       12000.0|
|Janusz|  30|        10| 10000.0|         30|                10|       10000.0|
|Maciej|  29|         9|  5000.0|         29|                 9|        5000.0|
|Lukasz|null|        32|100000.0|         25|                32|      100000.0|
|   Ala|  25|      null|  4000.0|         25|                10|        4000.0|
+------+----+----------+--------+-----------+------------------+--------------+



# 3 - Filter operations

In [73]:
imputer = Imputer(
    inputCols=['Age', 'Experience', 'Salary'],
    outputCols=[c for c in ['Age', 'Experience', 'Salary']]
).setStrategy('median')

In [74]:
df = imputer.fit(df).transform(df)
df.show()

+------+---+----------+--------+
|  Name|Age|Experience|  Salary|
+------+---+----------+--------+
| Julia| 20|         0| 26200.0|
|Marcin| 22|         1| 12000.0|
|Janusz| 30|        10| 10000.0|
|Maciej| 29|         9|  5000.0|
|Lukasz| 25|        32|100000.0|
|   Ala| 25|        10|  4000.0|
+------+---+----------+--------+



### Salary of the people less than or equal to 12000

In [76]:
df.filter("Salary<=12000").show()

+------+---+----------+-------+
|  Name|Age|Experience| Salary|
+------+---+----------+-------+
|Marcin| 22|         1|12000.0|
|Janusz| 30|        10|10000.0|
|Maciej| 29|         9| 5000.0|
|   Ala| 25|        10| 4000.0|
+------+---+----------+-------+



In [77]:
df.filter(df.Salary <= 12000).show()

+------+---+----------+-------+
|  Name|Age|Experience| Salary|
+------+---+----------+-------+
|Marcin| 22|         1|12000.0|
|Janusz| 30|        10|10000.0|
|Maciej| 29|         9| 5000.0|
|   Ala| 25|        10| 4000.0|
+------+---+----------+-------+



In [78]:
df.filter(df.Salary <= 12000).select(['Name','Age']).show()

+------+---+
|  Name|Age|
+------+---+
|Marcin| 22|
|Janusz| 30|
|Maciej| 29|
|   Ala| 25|
+------+---+



In [83]:
df.filter((df['Salary'] <= 12000) & (df['Salary'] > 4000)).show()

+------+---+----------+-------+
|  Name|Age|Experience| Salary|
+------+---+----------+-------+
|Marcin| 22|         1|12000.0|
|Janusz| 30|        10|10000.0|
|Maciej| 29|         9| 5000.0|
+------+---+----------+-------+



In [84]:
df.filter((df.Age == 22) | (df.Age == 30)).show()

+------+---+----------+-------+
|  Name|Age|Experience| Salary|
+------+---+----------+-------+
|Marcin| 22|         1|12000.0|
|Janusz| 30|        10|10000.0|
+------+---+----------+-------+



In [85]:
df.filter(~((df.Age == 22) | (df.Age == 30))).show()

+------+---+----------+--------+
|  Name|Age|Experience|  Salary|
+------+---+----------+--------+
| Julia| 20|         0| 26200.0|
|Maciej| 29|         9|  5000.0|
|Lukasz| 25|        32|100000.0|
|   Ala| 25|        10|  4000.0|
+------+---+----------+--------+



# 4 - Group by and aggregate functions

### Group by

In [89]:
df.groupBy('Name').sum().show()

+------+--------+---------------+-----------+
|  Name|sum(Age)|sum(Experience)|sum(Salary)|
+------+--------+---------------+-----------+
|Janusz|      30|             10|    10000.0|
| Julia|      20|              0|    26200.0|
|Maciej|      29|              9|     5000.0|
|   Ala|      25|             10|     4000.0|
|Marcin|      22|              1|    12000.0|
|Lukasz|      25|             32|   100000.0|
+------+--------+---------------+-----------+



In [91]:
df.groupBy('Experience').count().show()

+----------+-----+
|Experience|count|
+----------+-----+
|         1|    1|
|         9|    1|
|        10|    2|
|        32|    1|
|         0|    1|
+----------+-----+



In [93]:
df.agg({'Experience': 'avg'}).show()

+------------------+
|   avg(Experience)|
+------------------+
|10.333333333333334|
+------------------+



In [94]:
df.agg({'Salary': 'max'}).show()

+-----------+
|max(Salary)|
+-----------+
|   100000.0|
+-----------+



# Example of PySpark ML

In [95]:
df.show()

+------+---+----------+--------+
|  Name|Age|Experience|  Salary|
+------+---+----------+--------+
| Julia| 20|         0| 26200.0|
|Marcin| 22|         1| 12000.0|
|Janusz| 30|        10| 10000.0|
|Maciej| 29|         9|  5000.0|
|Lukasz| 25|        32|100000.0|
|   Ala| 25|        10|  4000.0|
+------+---+----------+--------+



In [96]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: double (nullable = true)



In [97]:
df.columns

['Name', 'Age', 'Experience', 'Salary']

In [99]:
from pyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(inputCols=['Age', 'Experience'], outputCol='Independent feature') 

In [100]:
output = featureassembler.transform(df)

In [101]:
output.show()

+------+---+----------+--------+-------------------+
|  Name|Age|Experience|  Salary|Independent feature|
+------+---+----------+--------+-------------------+
| Julia| 20|         0| 26200.0|         [20.0,0.0]|
|Marcin| 22|         1| 12000.0|         [22.0,1.0]|
|Janusz| 30|        10| 10000.0|        [30.0,10.0]|
|Maciej| 29|         9|  5000.0|         [29.0,9.0]|
|Lukasz| 25|        32|100000.0|        [25.0,32.0]|
|   Ala| 25|        10|  4000.0|        [25.0,10.0]|
+------+---+----------+--------+-------------------+



In [102]:
finalized_data = output.select(['Independent feature', 'Salary'])
finalized_data.show()

+-------------------+--------+
|Independent feature|  Salary|
+-------------------+--------+
|         [20.0,0.0]| 26200.0|
|         [22.0,1.0]| 12000.0|
|        [30.0,10.0]| 10000.0|
|         [29.0,9.0]|  5000.0|
|        [25.0,32.0]|100000.0|
|        [25.0,10.0]|  4000.0|
+-------------------+--------+



In [104]:
from pyspark.ml.regression import LinearRegression
train_data, test_data = finalized_data.randomSplit([0.75, 0.25])
regressor = LinearRegression(featuresCol='Independent feature', labelCol='Salary')
regressor = regressor.fit(train_data)

In [105]:
regressor.coefficients

DenseVector([545.2747, -2456.2637])

In [106]:
regressor.intercept

14930.7692307697

In [117]:
pred_results = regressor.evaluate(test_data)
pred_results.predictions.show()

+-------------------+--------+------------------+
|Independent feature|  Salary|        prediction|
+-------------------+--------+------------------+
|         [22.0,1.0]| 12000.0| 24470.54945054943|
|        [25.0,32.0]|100000.0|-50037.80219780174|
+-------------------+--------+------------------+



In [118]:
pred_results.meanAbsoluteError, pred_results.meanSquaredError

(81254.17582417559, 11333428345.972641)