In [None]:
#!pip install pyspark

In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession

In [3]:
# create a SparkSession
spark = SparkSession.builder.appName('PySpark Practice').getOrCreate()

23/04/01 20:42:30 WARN Utils: Your hostname, Alirezas-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.0.0.202 instead (on interface en0)
23/04/01 20:42:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/01 20:42:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark

In [5]:
# create a DataFrame (1st approach)
#df_pandas = pd.read_csv('test1.csv')
df_pyspark = spark.read.csv('test1.csv')
df_pyspark

                                                                                

DataFrame[_c0: string, _c1: string, _c2: string]

In [6]:
df_pyspark.show()

+-------+---+---------+
|    _c0|_c1|      _c2|
+-------+---+---------+
|   Name|Age|      DoB|
|Alireza| 18|Apr25XXXX|
|Yasaman| 18|Feb28XXXX|
|Artemis|  1|Apr16XXXX|
+-------+---+---------+



In [7]:
# create a DataFrame (2nd approach)
df_pyspark = spark.read.option('header', 'true').csv('test1.csv')
df_pyspark

DataFrame[Name: string, Age: string, DoB: string]

In [8]:
df_pyspark.show()

+-------+---+---------+
|   Name|Age|      DoB|
+-------+---+---------+
|Alireza| 18|Apr25XXXX|
|Yasaman| 18|Feb28XXXX|
|Artemis|  1|Apr16XXXX|
+-------+---+---------+



In [9]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [10]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- DoB: string (nullable = true)



In [11]:
# create a DataFrame (3rd and best approach)
df_pyspark = spark.read.csv('test1.csv', header=True, inferSchema=True)
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- DoB: string (nullable = true)



In [12]:
# return column headers 
df_pyspark.columns

['Name', 'Age', 'DoB']

In [13]:
df_pyspark.head(3)

[Row(Name='Alireza', Age=18, DoB='Apr25XXXX'),
 Row(Name='Yasaman', Age=18, DoB='Feb28XXXX'),
 Row(Name='Artemis', Age=1, DoB='Apr16XXXX')]

In [14]:
# df_pandas['name']
df_pyspark['Name'].show()

TypeError: 'Column' object is not callable

In [15]:
df_pyspark.select('Name').show()

+-------+
|   Name|
+-------+
|Alireza|
|Yasaman|
|Artemis|
+-------+



In [16]:
df_pyspark.dtypes

[('Name', 'string'), ('Age', 'int'), ('DoB', 'string')]

In [17]:
df_pyspark.describe().show()

[Stage 10:>                                                         (0 + 1) / 1]

+-------+-------+------------------+---------+
|summary|   Name|               Age|      DoB|
+-------+-------+------------------+---------+
|  count|      3|                 3|        3|
|   mean|   null|12.333333333333334|     null|
| stddev|   null| 9.814954576223638|     null|
|    min|Alireza|                 1|Apr16XXXX|
|    max|Yasaman|                18|Feb28XXXX|
+-------+-------+------------------+---------+



                                                                                

In [18]:
# add column to the dataframe
df_pyspark = df_pyspark.withColumn('DoB', ['Apr2519XX', 'Feb2119XX', 'Apr1620XX'])

TypeError: col should be Column

In [19]:
# drop column from the dataframe
df_pyspark.drop('Name').show()

+---+---------+
|Age|      DoB|
+---+---------+
| 18|Apr25XXXX|
| 18|Feb28XXXX|
|  1|Apr16XXXX|
+---+---------+



In [20]:
# rename the columns
df_pyspark.withColumnRenamed('Name', 'FirstName').show()

+---------+---+---------+
|FirstName|Age|      DoB|
+---------+---+---------+
|  Alireza| 18|Apr25XXXX|
|  Yasaman| 18|Feb28XXXX|
|  Artemis|  1|Apr16XXXX|
+---------+---+---------+



In [21]:
# drop null values
df_pyspark.na.drop() # default is how='any'
df_pyspark.na.drop(how='all')
df_pyspark.na.drop(thresh=2) # if at least 2 non-null values exist, it willt drop the row
df_pyspark.na.drop(subset=['DoB']) # drop if null exists in subset columns

DataFrame[Name: string, Age: int, DoB: string]

In [22]:
# fill missing values
df_pyspark.na.fill('MissingValues') # fill all null values with "MissingValues"
df_pyspark.na.fill('MissingValue', 'DoB') # fill null values in "DoB" column with "MissingValues"

DataFrame[Name: string, Age: int, DoB: string]

In [23]:
# fill null values with "mean" (or "mode", or "median")
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols = ['Age'],
    outputCols = ['{}_imputed'.format(c) for c in ['Age']]
    ).setStrategy('mean')

In [24]:
imputer.fit(df_pyspark).transform(df_pyspark).show

<bound method DataFrame.show of DataFrame[Name: string, Age: int, DoB: string, Age_imputed: int]>

### Filter Operation

In [25]:
df_pyspark.filter("Age > 2").show()

+-------+---+---------+
|   Name|Age|      DoB|
+-------+---+---------+
|Alireza| 18|Apr25XXXX|
|Yasaman| 18|Feb28XXXX|
+-------+---+---------+



In [26]:
df_pyspark.filter(df_pyspark['Age'] > 2).show()

+-------+---+---------+
|   Name|Age|      DoB|
+-------+---+---------+
|Alireza| 18|Apr25XXXX|
|Yasaman| 18|Feb28XXXX|
+-------+---+---------+



In [27]:
df_pyspark.filter((df_pyspark['Age'] > 2) & (df_pyspark['Name'] == 'Yasaman')).show()

+-------+---+---------+
|   Name|Age|      DoB|
+-------+---+---------+
|Yasaman| 18|Feb28XXXX|
+-------+---+---------+



In [28]:
df_pyspark.filter((df_pyspark['Age'] > 2) | (df_pyspark['Name'] == 'Artemis')).show()

+-------+---+---------+
|   Name|Age|      DoB|
+-------+---+---------+
|Alireza| 18|Apr25XXXX|
|Yasaman| 18|Feb28XXXX|
|Artemis|  1|Apr16XXXX|
+-------+---+---------+



In [29]:
df_pyspark.filter(~(df_pyspark['Age'] > 2)).show()

+-------+---+---------+
|   Name|Age|      DoB|
+-------+---+---------+
|Artemis|  1|Apr16XXXX|
+-------+---+---------+



In [30]:
df_pyspark.filter(df_pyspark['Age'] > 2).select(['Name', 'DoB']).show()

+-------+---------+
|   Name|      DoB|
+-------+---------+
|Alireza|Apr25XXXX|
|Yasaman|Feb28XXXX|
+-------+---------+



### GroupBy and Agrregtion

In [None]:
df_pyspark.groupBy('Name').sum().show()

In [None]:
df_pyspark.groupBy('Department').mean().show()

In [None]:
df_pyspark.groupBy('Department').count().show()

In [None]:
df_pyspark.agg({'Salary': 'sum'}).show()

# Machine Learning

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ML').getOrCreate()

In [None]:
# read dataset
spark_df = spark.read.csv('test1.csv', header=True, inferSchema=True)
spark_df.show()

In [None]:
spark_df.printSchema()

In [None]:
spark_df.columns

# 