PySpark is well integrated with Pandas. Noted that the Pyspark is similar to the concept in Pandas . it aslo easy to convert the PySpark to DataFrame and vice - versa, making the integration between the packages straightforward

### Step 1
1. Import pandas as pd 
>This will install all dependencies needed


### Step 3
 Create a pandas DataFrame


In [0]:
import pandas as pd 

# Create the DataFrame

data = {
  "country": ["Nigeria", "Egypt", "South Africa"],
  "River": ["Niger", "Nile", "Orange"]
}

data_dataframe = pd.DataFrame(data)
data_dataframe.display()

country,River
Nigeria,Niger
Egypt,Nile
South Africa,Orange


###  Covert the DataFrame to PySpark


In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark DataFrame").getOrCreate()

pyspark_to_dataframe = spark.createDataFrame(data_dataframe)

pyspark_to_dataframe.display()


country,River
Nigeria,Niger
Egypt,Nile
South Africa,Orange


1. SparkSession.builder: This starts the process of configuring and building a new Spark Session.
2. .appName(Spark DataFrame): The appName method sets a name for the Spark application.
3. .getOrCreate(): This method either retrieves an existing Spark Session if
one exists or creates a new one if none is found. It ensures that only one
Spark Session is active at a time

### Loading Data

import pandas as pd
# for pandas
pandas_dataframe = pd.read_csv("datasetFormat")
# for PySpark
spark_dataframe = spark.read.csv("datasetFormat", header=True, inferSchema=True)

### Select columns dataset in pandas and PySpark

In [0]:
import pandas as pd

president_df = pd.DataFrame({
    "presidents": ['George Washington',
'Abraham Lincoln', 'Franklin D. Roosevelt', 'John F. Kennedy', 'Barack Obama'],
    "Years": [1789, 1861, 1933, 1961, 2009]
})

president_df[["presidents", "Years"]]

Unnamed: 0,presidents,Years
0,George Washington,1789
1,Abraham Lincoln,1861
2,Franklin D. Roosevelt,1933
3,John F. Kennedy,1961
4,Barack Obama,2009


### Select columns in PySpark


In [0]:
# Convert to Spark DataFrame
pyspark_presidents = spark.createDataFrame(president_df)

pyspark_presidents.show()

+--------------------+-----+
|          presidents|Years|
+--------------------+-----+
|   George Washington| 1789|
|     Abraham Lincoln| 1861|
|Franklin D. Roose...| 1933|
|     John F. Kennedy| 1961|
|        Barack Obama| 2009|
+--------------------+-----+



In [0]:
pyspark_presidents.select("presidents").show()

+--------------------+
|          presidents|
+--------------------+
|   George Washington|
|     Abraham Lincoln|
|Franklin D. Roose...|
|     John F. Kennedy|
|        Barack Obama|
+--------------------+



### Filter is PySpark

In [0]:
pyspark_presidents.filter(pyspark_presidents.Years > 1900).show()

+--------------------+-----+
|          presidents|Years|
+--------------------+-----+
|Franklin D. Roose...| 1933|
|     John F. Kennedy| 1961|
|        Barack Obama| 2009|
+--------------------+-----+



In [0]:
group_by = pyspark_presidents.groupBy("Years").count().show()

+-----+-----+
|Years|count|
+-----+-----+
| 1789|    1|
| 1861|    1|
| 1933|    1|
| 1961|    1|
| 2009|    1|
+-----+-----+



### Aggregation in PySpark

In [0]:
agg_data = {
    "Movies": ['Avengers', 'Frozen', 'Star Wars',
'The Lion King', 'Harry Potter'],
    "Revenue": [90000000, 70000000, 110000000, 80000000, 100000000]
}
# convert to pandas
agg_dataframe = pd.DataFrame(agg_data)
print("++++++++++++++++++++++++++++++++++++")
print("This is pandas output:" , agg_dataframe.head())
print("++++++++++++++++++++++++++++++++++++\n ")


# convert to pySpark
agg_pyspark = spark.createDataFrame(agg_dataframe)

print("This is pyspark output")
agg_pyspark.show()

++++++++++++++++++++++++++++++++++++
This is pandas output:           Movies    Revenue
0       Avengers   90000000
1         Frozen   70000000
2      Star Wars  110000000
3  The Lion King   80000000
4   Harry Potter  100000000
++++++++++++++++++++++++++++++++++++
 
This is pyspark output
+-------------+---------+
|       Movies|  Revenue|
+-------------+---------+
|     Avengers| 90000000|
|       Frozen| 70000000|
|    Star Wars|110000000|
|The Lion King| 80000000|
| Harry Potter|100000000|
+-------------+---------+



In [0]:
agg_spark = agg_pyspark.groupBy("Movies").agg({"Revenue":"mean"}).show()

+-------------+-------------+
|       Movies|mean(Revenue)|
+-------------+-------------+
|     Avengers|        9.0E7|
|       Frozen|        7.0E7|
|    Star Wars|        1.1E8|
|The Lion King|        8.0E7|
| Harry Potter|        1.0E8|
+-------------+-------------+



### Join in both Pandas and PySpark

In [0]:
import pandas as pd 
# Create a DataFrame for this join operation
joined_df = {
  "Job Title": ['Software Engineer', 'Data Analyst','Project Manager'],
  "Salary": [100000, 80000, 90000],
  "Location": ['New York', 'Chicago', 'Los Angeles']
                
}

# to DataFrame
joined_df = pd.DataFrame(joined_df)
print("This is pandas output:" , joined_df.head())
print("++++++++++++++++++++++++++++++++++++\n ")

it_salaries = spark.createDataFrame(joined_df)
print("This is pyspark output")
it_salaries.show()

This is pandas output:            Job Title  Salary     Location
0  Software Engineer  100000     New York
1       Data Analyst   80000      Chicago
2    Project Manager   90000  Los Angeles
++++++++++++++++++++++++++++++++++++
 
This is pyspark output
+-----------------+------+-----------+
|        Job Title|Salary|   Location|
+-----------------+------+-----------+
|Software Engineer|100000|   New York|
|     Data Analyst| 80000|    Chicago|
|  Project Manager| 90000|Los Angeles|
+-----------------+------+-----------+



In [0]:
# second dataset

joined_df2 = {
  "Job Title": ['Software Engineer', 'Data Analyst','Project Manager'],
  "Salary": [100000, 80000, 90000],
  "Location": ['New York','Angeles','Chicago']
}
joined_df2_pandas = pd.DataFrame(joined_df2)
print("This is pandas output:" , joined_df2_pandas.head())
print("++++++++++++++++++++++++++++++++++++\n ")

it_salaries2 = spark.createDataFrame(joined_df2_pandas)
print("This is pyspark output")
it_salaries2.show()

This is pandas output:            Job Title  Salary  Location
0  Software Engineer  100000  New York
1       Data Analyst   80000   Angeles
2    Project Manager   90000   Chicago
++++++++++++++++++++++++++++++++++++
 
This is pyspark output
+-----------------+------+--------+
|        Job Title|Salary|Location|
+-----------------+------+--------+
|Software Engineer|100000|New York|
|     Data Analyst| 80000| Angeles|
|  Project Manager| 90000| Chicago|
+-----------------+------+--------+



In [0]:
# Perdorming inner join
inner_join = it_salaries.join(it_salaries2, on='Job Title', how='inner')
inner_join.show()

+-----------------+------+-----------+------+--------+
|        Job Title|Salary|   Location|Salary|Location|
+-----------------+------+-----------+------+--------+
|Software Engineer|100000|   New York|100000|New York|
|     Data Analyst| 80000|    Chicago| 80000| Angeles|
|  Project Manager| 90000|Los Angeles| 90000| Chicago|
+-----------------+------+-----------+------+--------+



In [0]:
right_join = it_salaries.join(it_salaries2, on='Job Title', how='right')
right_join.select("Job Title").show()

+-----------------+
|        Job Title|
+-----------------+
|Software Engineer|
|     Data Analyst|
|  Project Manager|
+-----------------+



### Save Data

In [0]:
save_it_salaries = it_salaries.write.csv("salary_data.csv", header=True, mode= "overwrite")

[0;31m---------------------------------------------------------------------------[0m
[0;31mUnsupportedOperationException[0m             Traceback (most recent call last)
File [0;32m<command-7055105001397511>, line 1[0m
[0;32m----> 1[0m save_it_salaries [38;5;241m=[39m it_salaries[38;5;241m.[39mwrite[38;5;241m.[39mcsv([38;5;124m"[39m[38;5;124msalary_data.csv[39m[38;5;124m"[39m, header[38;5;241m=[39m[38;5;28;01mTrue[39;00m, mode[38;5;241m=[39m [38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m)

File [0;32m/databricks/python/lib/python3.11/site-packages/pyspark/sql/connect/readwriter.py:807[0m, in [0;36mDataFrameWriter.csv[0;34m(self, path, mode, compression, sep, quote, escape, header, nullValue, escapeQuotes, quoteAll, dateFormat, timestampFormat, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, charToEscapeQuoteEscaping, encoding, emptyValue, lineSep)[0m
[1;32m    788[0m [38;5;28mself[39m[38;5;241m.[39mmode(mode)
[1;32m    789[0m [38;