# PySpark Project Notebook

### This notebook demonstrates how to use PySpark to load, manipulate, and analyze data using Spark DataFrames. The data is loaded from CSV files, and various transformations and aggregations are applied using Spark's powerful distributed computing framework.

## 1. Importing Required Libraries

In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession

## 2. Initializing SparkSession

In [3]:
spark=SparkSession.builder.appName("project").getOrCreate()

In [4]:
spark

## Loading Data into Spark DataFrame

In [17]:
data=spark.read.option("header","true").csv("data_p.csv",inferSchema=True)


In [24]:
data.show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|     4.

## 1.Using	SparkSession	and	‘data.csv’,	print	all	the	distinct	countries	in	ascending	order	with	
## 'an'	in	their	name

In [30]:
from pyspark.sql.functions import col,max

In [26]:
distinct_countries = (
    data.filter(col("Country").contains("an"))
    .select("Country")
    .distinct()
    .orderBy("Country")
)

In [27]:
distinct_countries.show()

+------------------+
|           Country|
+------------------+
|            Canada|
|   Channel Islands|
|European Community|
|           Finland|
|            France|
|           Germany|
|           Iceland|
|             Japan|
|           Lebanon|
|         Lithuania|
|       Netherlands|
|            Poland|
|       Switzerland|
+------------------+



## 2.Using	SparkSession	and	‘data.csv’,	show	the	InvoiceNo,	StockCode,	and	Description	for	the	
## highest	unit	price.		

In [33]:
from pyspark.sql.functions import col,max
max_price=data.agg(max(col("UnitPrice"))).collect()[0][0]
max_colunm=data.filter(col("UnitPrice")==max_price )

max_data=max_colunm.select("InvoiceNo", "StockCode", "Description")


In [34]:
max_data.show()

+---------+---------+-----------+
|InvoiceNo|StockCode|Description|
+---------+---------+-----------+
|  C556445|        M|     Manual|
+---------+---------+-----------+



## 3.Using	SparkSession	and	the	ile	fakefriends-header.csv,	Show	each	name's	total	number	of	
## friends.	Order	the	results	by	name	in	alphabetical	order.	

In [35]:
fake=spark.read.option("header","True").csv("fakefriends-header.csv",inferSchema=True)

In [36]:
fake.show()

+------+--------+---+-------+
|userID|    name|age|friends|
+------+--------+---+-------+
|     0|    Will| 33|    385|
|     1|Jean-Luc| 26|      2|
|     2|    Hugh| 55|    221|
|     3|  Deanna| 40|    465|
|     4|   Quark| 68|     21|
|     5|  Weyoun| 59|    318|
|     6|  Gowron| 37|    220|
|     7|    Will| 54|    307|
|     8|  Jadzia| 38|    380|
|     9|    Hugh| 27|    181|
|    10|     Odo| 53|    191|
|    11|     Ben| 57|    372|
|    12|   Keiko| 54|    253|
|    13|Jean-Luc| 56|    444|
|    14|    Hugh| 43|     49|
|    15|     Rom| 36|     49|
|    16|  Weyoun| 22|    323|
|    17|     Odo| 35|     13|
|    18|Jean-Luc| 45|    455|
|    19|  Geordi| 60|    246|
+------+--------+---+-------+
only showing top 20 rows



In [62]:
from pyspark.sql.functions import col

sum_friends=fake.groupBy("name").sum("friends")

sum_friends=sum_friends.orderBy("name")


In [64]:
sum_friends.show(10)

+-------+------------+
|   name|sum(friends)|
+-------+------------+
|    Ben|        4888|
|Beverly|        6128|
|  Brunt|        4805|
|   Data|        7192|
| Deanna|        3479|
|  Dukat|        5317|
|   Elim|        2541|
|   Ezri|        4236|
| Geordi|        4728|
| Gowron|        2602|
+-------+------------+
only showing top 10 rows



## 4.	Using	SparkSession	and	the	ile	ContainsNull.csv,	explain	the	signi icance	of	how	and	thresh	arguments	in	drop()	function.		

### how Argument:
#### The how argument specifies the condition for dropping rows:
#### 1."any": Drop a row if it contains any null values in any column
#### 2."all": Drop a row only if all its columns contain null values.

### thresh Argument:
#### The thresh argument sets a threshold for the minimum number of non-null values required to keep a row

#### exapmle - For example, if you set thresh=2, a row must have at least 2 non-null values to be retained. If it has fewer, it will be dropped

### 5. Using	SparkSession	and	the	ile	ContainsNull.csv,	ill	the	null	sales	values	with	the	minimum	sales	value.		

In [76]:
null=spark.read.option("header","True").csv("ContainsNull.csv",inferSchema=True)

In [77]:
null.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp2| NULL| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [79]:
from pyspark.sql.functions import col, min
min_values = null.agg(min(col('Sales'))).collect()[0][0]

In [91]:
null=data_filled = null.fillna( min_values)

null.show()

## 6.Using	SparkSession	and	the	ile	appl_stock.csv,	show	the	unique	trade	years	in	descending	
## order	with	the	output	column	name	as	year

In [93]:
apple=spark.read.option("header","True").csv("appl_stock.csv",inferSchema=True)

In [94]:
apple.show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [122]:
from pyspark.sql.functions import year,desc,count
apple=apple.withColumn("year",year(apple["Date"]))


In [107]:
apple.show(5)

+----------+----------+----------+------------------+------------------+---------+------------------+----+
|      Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|year|
+----------+----------+----------+------------------+------------------+---------+------------------+----+
|2010-01-04|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|2010|
|2010-01-05|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|2010|
|2010-01-06|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|2010|
|2010-01-07|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|2010|
|2010-01-08|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|2010|
+----------+----------+----------+------------------+------------------+---------+------------------+----+
only showing top 5 rows



In [115]:
unique_value=apple.select("year").distinct().orderBy(desc("year"))

In [116]:
unique_value.show()

+----+
|year|
+----+
|2016|
|2015|
|2014|
|2013|
|2012|
|2011|
|2010|
+----+



## 7.Using	SparkSession	and	the	ile	appl_stock.csv,	show	the	average	trade	volume	for	each	year	
withthe	output	column	names	and	values	as	shown	below.	

In [120]:
sum_tred=apple.groupBy("year").sum("Volume")

In [121]:
sum_tred.show()

+----+-----------+
|year|sum(Volume)|
+----+-----------+
|2015|13063147500|
|2013|25605392400|
|2014|15914488100|
|2012|32991051100|
|2016| 9680671300|
|2010|37756231800|
|2011|31014834900|
+----+-----------+



In [159]:
from pyspark.sql import functions as F
avg_volume = apple.groupBy("year").agg(F.format_number(F.avg("Volume"), 2).alias("Final Avg Volume")).orderBy("year")



In [160]:
avg_volume.show()

+----+----------------+
|year|Final Avg Volume|
+----+----------------+
|2010|  149,826,316.67|
|2011|  123,074,741.67|
|2012|  131,964,204.40|
|2013|  101,608,700.00|
|2014|   63,152,730.56|
|2015|   51,837,886.90|
|2016|   38,415,362.30|
+----+----------------+



# Conclusion

### This notebook demonstrated how to use PySpark for loading, exploring, and analyzing large datasets. Key tasks included importing data from a CSV, inspecting its structure, and performing aggregations like calculating the sum and average trade volumes by year. PySpark’s ability to handle big data efficiently makes it a powerful tool for scalable data analysis.