In [177]:
# Create SparkSession
# We don't need to create SparkContext everytime if we are dealing with DataFrmes - only for Spark 2.X and above
# SparkSession however is required
from pyspark.sql import SparkSession

In [181]:
# Create SparkSession
# Spark session get or create,Spark is designed to have only one active Spark session per JVM (Java Virtual Machine). 
# If you're running Spark code in an interactive environment or script, 
# it's common to see this warning when you attempt to create a new Spark session after one has already been established.
ss = SparkSession.builder.appName('playground-test').getOrCreate()

In [182]:
ss

In [183]:
# There are multiple ways we can read csv.
# Using read.option.csv and using read.csv(and then providing options here)
df = ss.read.csv('test.csv', header=True, inferSchema=True)
df.show()

+----------+---------+--------+--------------------+------+---+-----------+--------------+----------+----------+
|CustomerID|FirstName|LastName|               Email|Gender|Age|    Country|AccountCreated| LastLogin|TotalSpend|
+----------+---------+--------+--------------------+------+---+-----------+--------------+----------+----------+
|         1|     John|     Doe| johndoe@example.com|  Male| 28|        USA|    2023-01-01|2023-01-15|     120.5|
|         2|     Jane|   Smith|janesmith@example...|Female| 34|     Canada|    2023-02-01|2023-02-12|      89.9|
|         3|      Bob|   Brown|bobbrown@example.com|  Male| 45|         UK|    2023-01-20|2023-01-25|     305.6|
|         4|     Lisa|   White|lisawhite@example...|Female| 23|        USA|    2023-03-15|2023-03-16|     150.0|
|         5|     Mark|   Black|markblack@example...|  Male| 37|  Australia|    2023-02-10|2023-02-20|     204.7|
|         6|    Emily|   Green|emilygreen@exampl...|Female| 29|New Zealand|    2023-03-05|2023-0

Refernce Spark: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.fillna.html

In [184]:
# Prints the Schema of the DF same as pandas .info() / .describe()
df.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- AccountCreated: date (nullable = true)
 |-- LastLogin: string (nullable = true)
 |-- TotalSpend: double (nullable = true)



In [215]:
# It samples fraction of total rows - meaning gives out randomly fraction % of total rows
# using the second optional argument seed
df.sample(fraction = 0.2, seed = 3).show()

+----------+---------+--------+--------------------+------+---+---------+--------------+----------+----------+
|CustomerID|FirstName|LastName|               Email|Gender|Age|  Country|AccountCreated| LastLogin|TotalSpend|
+----------+---------+--------+--------------------+------+---+---------+--------------+----------+----------+
|         5|     Mark|   Black|markblack@example...|  Male| 37|Australia|    2023-02-10|2023-02-20|     204.7|
+----------+---------+--------+--------------------+------+---+---------+--------------+----------+----------+



In [216]:
# Select is used to select particular colums
### df.columns returns a list of columns
df.select( df.columns[0]).sample(0.5).show()

+----------+
|CustomerID|
+----------+
|         1|
|         3|
|         4|
|         5|
|         6|
|        10|
|        11|
|        13|
+----------+



In [188]:
# We can use filter and where interchangeably
df.where(df.Country=='USA').show()

+----------+---------+--------+--------------------+------+---+-------+--------------+----------+----------+
|CustomerID|FirstName|LastName|               Email|Gender|Age|Country|AccountCreated| LastLogin|TotalSpend|
+----------+---------+--------+--------------------+------+---+-------+--------------+----------+----------+
|         1|     John|     Doe| johndoe@example.com|  Male| 28|    USA|    2023-01-01|2023-01-15|     120.5|
|         4|     Lisa|   White|lisawhite@example...|Female| 23|    USA|    2023-03-15|2023-03-16|     150.0|
|         8|   Olivia|   Scott|oliviascott@examp...|Female| 19|    USA|    2023-03-12|2023-03-13|      95.6|
|        10|      Ava|   Jones|avajones@example.com|Female| 31|    USA|    2023-02-15|2023-02-18|     240.9|
+----------+---------+--------+--------------------+------+---+-------+--------------+----------+----------+



In [189]:
df.filter(df.Country=='USA').show()

+----------+---------+--------+--------------------+------+---+-------+--------------+----------+----------+
|CustomerID|FirstName|LastName|               Email|Gender|Age|Country|AccountCreated| LastLogin|TotalSpend|
+----------+---------+--------+--------------------+------+---+-------+--------------+----------+----------+
|         1|     John|     Doe| johndoe@example.com|  Male| 28|    USA|    2023-01-01|2023-01-15|     120.5|
|         4|     Lisa|   White|lisawhite@example...|Female| 23|    USA|    2023-03-15|2023-03-16|     150.0|
|         8|   Olivia|   Scott|oliviascott@examp...|Female| 19|    USA|    2023-03-12|2023-03-13|      95.6|
|        10|      Ava|   Jones|avajones@example.com|Female| 31|    USA|    2023-02-15|2023-02-18|     240.9|
+----------+---------+--------+--------------------+------+---+-------+--------------+----------+----------+



In [217]:
# Drop - drops the coloumn specified and returns a new dataframe, 
# it doesn't make modifications inplace
df2 = df.drop("LastLogin")

In [218]:
df2.show()

+----------+---------+--------+--------------------+------+---+-----------+--------------+----------+
|CustomerID|FirstName|LastName|               Email|Gender|Age|    Country|AccountCreated|TotalSpend|
+----------+---------+--------+--------------------+------+---+-----------+--------------+----------+
|         1|     John|     Doe| johndoe@example.com|  Male| 28|        USA|    2023-01-01|     120.5|
|         2|     Jane|   Smith|janesmith@example...|Female| 34|     Canada|    2023-02-01|      89.9|
|         3|      Bob|   Brown|bobbrown@example.com|  Male| 45|         UK|    2023-01-20|     305.6|
|         4|     Lisa|   White|lisawhite@example...|Female| 23|        USA|    2023-03-15|     150.0|
|         5|     Mark|   Black|markblack@example...|  Male| 37|  Australia|    2023-02-10|     204.7|
|         6|    Emily|   Green|emilygreen@exampl...|Female| 29|New Zealand|    2023-03-05|    255.25|
|         7|    James|    Hall|jameshall@example...|  Male| 41|         UK|    202

In [232]:
# Dropna - drops all the rows on specified condition how - can take two values
# viz any and all.
#We can pass optional subset parameter, to apply how only on those subset of cols
df3 = df.dropna(how='all')

In [221]:
df3.show()

+----------+---------+--------+--------------------+------+---+-----------+--------------+----------+----------+
|CustomerID|FirstName|LastName|               Email|Gender|Age|    Country|AccountCreated| LastLogin|TotalSpend|
+----------+---------+--------+--------------------+------+---+-----------+--------------+----------+----------+
|         1|     John|     Doe| johndoe@example.com|  Male| 28|        USA|    2023-01-01|2023-01-15|     120.5|
|         2|     Jane|   Smith|janesmith@example...|Female| 34|     Canada|    2023-02-01|2023-02-12|      89.9|
|         3|      Bob|   Brown|bobbrown@example.com|  Male| 45|         UK|    2023-01-20|2023-01-25|     305.6|
|         4|     Lisa|   White|lisawhite@example...|Female| 23|        USA|    2023-03-15|2023-03-16|     150.0|
|         5|     Mark|   Black|markblack@example...|  Male| 37|  Australia|    2023-02-10|2023-02-20|     204.7|
|         6|    Emily|   Green|emilygreen@exampl...|Female| 29|New Zealand|    2023-03-05|2023-0

In [222]:
# dtypes returns a tuple of col, dtype of col
df3.dtypes

[('CustomerID', 'int'),
 ('FirstName', 'string'),
 ('LastName', 'string'),
 ('Email', 'string'),
 ('Gender', 'string'),
 ('Age', 'int'),
 ('Country', 'string'),
 ('AccountCreated', 'date'),
 ('LastLogin', 'string'),
 ('TotalSpend', 'double')]

In [223]:
df3.fillna({'TotalSpend':10}).show()
# Ref: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.fillna.html

+----------+---------+--------+--------------------+------+---+-----------+--------------+----------+----------+
|CustomerID|FirstName|LastName|               Email|Gender|Age|    Country|AccountCreated| LastLogin|TotalSpend|
+----------+---------+--------+--------------------+------+---+-----------+--------------+----------+----------+
|         1|     John|     Doe| johndoe@example.com|  Male| 28|        USA|    2023-01-01|2023-01-15|     120.5|
|         2|     Jane|   Smith|janesmith@example...|Female| 34|     Canada|    2023-02-01|2023-02-12|      89.9|
|         3|      Bob|   Brown|bobbrown@example.com|  Male| 45|         UK|    2023-01-20|2023-01-25|     305.6|
|         4|     Lisa|   White|lisawhite@example...|Female| 23|        USA|    2023-03-15|2023-03-16|     150.0|
|         5|     Mark|   Black|markblack@example...|  Male| 37|  Australia|    2023-02-10|2023-02-20|     204.7|
|         6|    Emily|   Green|emilygreen@exampl...|Female| 29|New Zealand|    2023-03-05|2023-0

In [225]:
# We can apply UDF to dataframe by using foreach
# ref :https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.foreach.html
def func(df):
    print(df.FirstName.lower())

df3.foreach(func)

john
jane
bob
lisa
mark
emily
james
olivia
william
ava
noah
mia
ethan
emma
michael


The .collect() method in PySpark is used to retrieve all the data from a DataFrame or RDD and bring it into the driver program's memory, which is generally not recommended for large datasets. This method returns the data as a list in the driver program.

While .collect() can be useful for small datasets that fit into the memory of the driver program, it should be used with caution for large datasets. Here are some considerations:

Memory Usage: .collect() brings all the data into the driver program's memory. If the dataset is large, it can lead to memory issues or even crashes.

Performance: Transferring large amounts of data from the distributed cluster to the driver program can be time-consuming and can affect the performance of your application.

Out of Memory Errors: If the dataset is too large to fit into the available memory of the driver program, it may result in out-of-memory errors.

In general, it's recommended to perform operations on distributed data using PySpark's transformations and actions without collecting data into the driver program unless necessary. If you need to analyze or visualize the data in the driver program, consider using techniques like sampling or aggregating the data before calling .collect().


In [226]:
for elem in df3.select(df3.FirstName).collect():
    print(elem.asDict()['FirstName'].lower())

john
jane
bob
lisa
mark
emily
james
olivia
william
ava
noah
mia
ethan
emma
michael


In [234]:
# withColumn addsd new Coloumn to dataframe returns new dataframe, if col name is same
# it will replace the existing col
from pyspark.sql.functions import lower
df3 = df3.withColumn("FirstName", lower(df3["FirstName"]))

In [235]:
df3.show()

+----------+---------+--------+--------------------+------+---+-----------+--------------+----------+----------+
|CustomerID|FirstName|LastName|               Email|Gender|Age|    Country|AccountCreated| LastLogin|TotalSpend|
+----------+---------+--------+--------------------+------+---+-----------+--------------+----------+----------+
|         1|     john|     Doe| johndoe@example.com|  Male| 28|        USA|    2023-01-01|2023-01-15|     120.5|
|         2|     jane|   Smith|janesmith@example...|Female| 34|     Canada|    2023-02-01|2023-02-12|      89.9|
|         3|      bob|   Brown|bobbrown@example.com|  Male| 45|         UK|    2023-01-20|2023-01-25|     305.6|
|         4|     lisa|   White|lisawhite@example...|Female| 23|        USA|    2023-03-15|2023-03-16|     150.0|
|         5|     mark|   Black|markblack@example...|  Male| 37|  Australia|    2023-02-10|2023-02-20|     204.7|
|         6|    emily|   Green|emilygreen@exampl...|Female| 29|New Zealand|    2023-03-05|2023-0

In [236]:
# Now withColumn is used to transform only one col, but withColoumns can be used to
# apply transformations to many columns at once.
from pyspark.sql.functions import upper
df3.withColumns({"FirstName": lower(df3['FirstName']), "LastName": upper(df3['LastName'])} ).show()

+----------+---------+--------+--------------------+------+---+-----------+--------------+----------+----------+
|CustomerID|FirstName|LastName|               Email|Gender|Age|    Country|AccountCreated| LastLogin|TotalSpend|
+----------+---------+--------+--------------------+------+---+-----------+--------------+----------+----------+
|         1|     john|     DOE| johndoe@example.com|  Male| 28|        USA|    2023-01-01|2023-01-15|     120.5|
|         2|     jane|   SMITH|janesmith@example...|Female| 34|     Canada|    2023-02-01|2023-02-12|      89.9|
|         3|      bob|   BROWN|bobbrown@example.com|  Male| 45|         UK|    2023-01-20|2023-01-25|     305.6|
|         4|     lisa|   WHITE|lisawhite@example...|Female| 23|        USA|    2023-03-15|2023-03-16|     150.0|
|         5|     mark|   BLACK|markblack@example...|  Male| 37|  Australia|    2023-02-10|2023-02-20|     204.7|
|         6|    emily|   GREEN|emilygreen@exampl...|Female| 29|New Zealand|    2023-03-05|2023-0

Using Bracket Notation (df["ColumnName"]):

This is the most explicit and versatile way to reference a column.
It allows you to use column names with spaces or special characters, e.g., df["First Name"].
It is used for both selecting and updating columns.

Using Dot Notation (df.ColumnName):

This notation is convenient and concise.
It is suitable for column names without spaces or special characters.
It is primarily used for selecting columns.

Using col Function (col("ColumnName")):

PySpark provides a col function from the pyspark.sql.functions module, which can be used to reference a column.
It is commonly used when applying functions to columns or in the context of expressions.

Using expr Function (expr("ColumnName")):

The expr function is used to create expressions involving columns.
It allows you to use SQL-like expressions.
Example:

python
Copy code
from pyspark.sql.functions import expr

expr("ColumnName + 1")
Using select Method (df.select("ColumnName")):

The select method is used to select one or more columns from a DataFrame.
It takes either column names as strings or column expressions.
Example:

python
Copy code
df.select("ColumnName")
In your provided code snippet, you are using both bracket notation (df3['FirstName'] and df3['LastName']) and the withColumns method. The choice between these notations often depends on the context, personal preference, or the specific operation being performed. For example, using df["ColumnName"] is generally safe and explicit, while dot notation (df.ColumnName) is concise but may not work with certain column names. It's recommended to use the notation that suits your specific use case and ensures readability and clarity in your code.

In [238]:
# expr is another useful funciton which stands for expression.
# We can write SQL like functions into exper and can trasnform data using pysaprk
from pyspark.sql.functions import lower, initcap, upper, concat, col, expr
df.withColumns({"FirstName": initcap(df["FirstName"]),
                     "LastName": lower(df["LastName"]),
                     "Country": upper(df["Country"]),
                   "Full Name": expr("FirstName || ' ' ||LastName")}).select('Full Name').show()
df.select(df.Country).distinct().count()
x = df.fillna({"TotalSpend" : 50, "LastLogin": '2024'})

+---------------+
|      Full Name|
+---------------+
|       John Doe|
|     Jane Smith|
|      Bob Brown|
|     Lisa White|
|     Mark Black|
|    Emily Green|
|     James Hall|
|   Olivia Scott|
|William Johnson|
|      Ava Jones|
|       Noah Lee|
|      Mia Brown|
|    Ethan Clark|
|    Emma Wilson|
| Michael Martin|
+---------------+



In [239]:
df3.select(df3.Country).distinct().show()

+-----------+
|    Country|
+-----------+
|        USA|
|         UK|
|     Canada|
|New Zealand|
|  Australia|
+-----------+



In [206]:
df3.groupBy("Country").count().show()

+-----------+-----+
|    Country|count|
+-----------+-----+
|        USA|    4|
|         UK|    4|
|     Canada|    3|
|New Zealand|    2|
|  Australia|    2|
+-----------+-----+



In [240]:
df3.groupBy("Country").max().show()

+-----------+---------------+--------+---------------+
|    Country|max(CustomerID)|max(Age)|max(TotalSpend)|
+-----------+---------------+--------+---------------+
|        USA|             10|      31|          240.9|
|         UK|             15|      45|          320.8|
|     Canada|             12|      36|          180.4|
|New Zealand|             14|      42|          310.2|
|  Australia|             13|      39|          290.3|
+-----------+---------------+--------+---------------+




Pandas and PySpark are both powerful tools for working with data, but they serve different purposes and have distinct advantages depending on the scale of your data and the requirements of your tasks.

Pandas:
Single-node Processing: Pandas is designed for in-memory data processing on a single machine. It's excellent for smaller to moderately sized datasets that can fit into the memory of a single machine.

Ease of Use: Pandas provides an easy-to-use and expressive API, making it a popular choice for data analysis and manipulation. It is widely used in the data science and analytics community.

Rich Functionality: Pandas supports a broad range of operations, including data cleaning, transformation, aggregation, and visualization.

PySpark:
Distributed Processing: PySpark, on the other hand, is designed for distributed data processing and can scale horizontally to handle large datasets across a cluster of machines. It uses the Hadoop Distributed File System (HDFS) for storage and Apache Spark for computation.

Big Data Processing: PySpark is well-suited for processing big data where the data is distributed across multiple nodes. It can handle data sizes that exceed the capacity of a single machine.

Resilient Distributed Datasets (RDDs): PySpark uses RDDs, an abstraction for distributed data, making it fault-tolerant and scalable.

Parallelism: PySpark inherently supports parallelism, allowing it to perform operations on distributed data in parallel across a cluster of machines.

When to Use Which:
Pandas: Use Pandas for smaller to medium-sized datasets that fit into memory and for interactive data analysis and exploration.

PySpark: Use PySpark when dealing with large-scale distributed datasets that require horizontal scaling. It's particularly useful in a big data processing environment.

Combination: In some cases, you might use Pandas for initial data exploration and preprocessing on a subset of the data and then switch to PySpark for processing the entire dataset in a distributed manner.

In summary, the choice between Pandas and PySpark depends on the scale of your data and the computational resources available. For big data processing, PySpark's distributed nature is a significant advantage, while Pandas is more suitable for smaller datasets.

In [241]:
df3.groupBy("FirstName").avg('age').alias("Age").sort("FirstName").show()

+---------+--------+
|FirstName|avg(age)|
+---------+--------+
|      ava|    31.0|
|      bob|    45.0|
|    emily|    29.0|
|     emma|    42.0|
|    ethan|    39.0|
|    james|    41.0|
|     jane|    34.0|
|     john|    28.0|
|     lisa|    23.0|
|     mark|    37.0|
|      mia|    27.0|
|  michael|    33.0|
|     noah|    22.0|
|   olivia|    19.0|
|  william|    36.0|
+---------+--------+



In [242]:
df3 = df.groupBy("Country").agg({"Age":"mean"})

In [243]:
df3.withColumnRenamed("avg(Age)", "Age").show()

+-----------+------------------+
|    Country|               Age|
+-----------+------------------+
|        USA|             25.25|
|         UK|             35.25|
|     Canada|32.333333333333336|
|New Zealand|              35.5|
|  Australia|              38.0|
+-----------+------------------+



In [244]:
# When we reassign a df result to another df it creates a new dataframe,
# and to verify it we can usee .rdd.id() function to get its unique id
df3.rdd.id()

257

In [245]:
df.rdd.id()

132