# PySpark (Local)
## What is PySpark
PySpark is the Python API for [Apache Spark](https://spark.apache.org/). PySpark enables developers to write Spark applications using Python, providing access to Spark’s rich set of features and capabilities through Python language. With its rich set of features, robust performance, and extensive ecosystem, PySpark has become a popular choice for data engineers, data scientists, and developers working with big data and distributed computing. PySpark is very well used in the Data Science and Machine Learning community as there are many widely used data science libraries written in Python including NumPy, and TensorFlow. Also used due to its efficient processing of large datasets.

Spark has a multi-language engine, that provides APIs (Application Programming Interfaces) and libraries for several programming languages like Java, Scala, Python, and R, allowing developers to work with Spark using the language they are most comfortable with.

- Scala: Spark’s primary and native language is Scala. Many of Spark’s core components are written in Scala, and it provides the most extensive API for Spark.
- Java: Spark provides a Java API that allows developers to use Spark within Java applications. Java developers can access most of Spark’s functionality through this API.
- R: Spark also offers an R API, enabling R users to work with Spark data and perform distributed data analysis using their familiar R language.
- Python: Spark offers a Python API, called PySpark, which is popular among data scientists and developers who prefer Python for data analysis and machine learning tasks. PySpark provides a Pythonic way to interact with Spark.

## Iinitialize PySpark Environment

This example requires spark environment. Please make sure that you've installed spark application on your environment before you begin. This is simple script to verify if a spark application is installed and configured.
```
# It is required to set the SPARK_HOME environment variable.
# Please make sure the variable indicates to the right path to your spark.
if [ -z $SPARK_HOME ] ; then
  export SPARK_HOME="$HOME/.local/lib/spark-3.5.4-bin-hadoop3"
fi
```

For more details, please refer to the [Install Spark](https://github.com/Young-ook/data-lab-on-wsl?tab=readme-ov-file#install-spark) and follow the instructions.

In [1]:
# validate findspark
!pip list | grep spark

findspark                 2.0.1


In [2]:
import findspark
findspark.init()

## PySpark on Local 

In [3]:
import pyspark
from pyspark.sql import SparkSession

# create a new spark context
#sc = pyspark.SparkContext(master="local", appName="pyspark-basic")
#spark = SparkSession(sc)
#
# or
spark = SparkSession.builder.master("local[1]").appName("pyspark-basic").getOrCreate()

# display spark session (local master)
spark

25/01/29 15:26:37 WARN Utils: Your hostname, emma resolves to a loopback address: 127.0.1.1; using 172.18.245.201 instead (on interface eth0)
25/01/29 15:26:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/29 15:26:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## DataFrame example
### What is DataFrame
A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine.

A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset\[Row\]. While, in Java API, users need to use Dataset<Row> to represent a DataFrame.

### Create DataFrame

In [4]:
# change the log level
spark.sparkContext.setLogLevel("ERROR")

In [5]:
# create DataFrame
data = [('001','Smith','M',40,'DA',4000),
        ('002','Rose','M',35,'DA',3000),
        ('003','Williams','M',30,'DE',2500),
        ('004','Anne','F',30,'DE',3000),
        ('005','Mary','F',35,'BE',4000),
        ('006','James','M',30,'FE',3500)]

columns = ["cd","name","gender","age","div","salary"]
df = spark.createDataFrame(data = data, schema = columns)

df.printSchema()
df.show()

root
 |-- cd: string (nullable = true)
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: long (nullable = true)
 |-- div: string (nullable = true)
 |-- salary: long (nullable = true)



                                                                                

+---+--------+------+---+---+------+
| cd|    name|gender|age|div|salary|
+---+--------+------+---+---+------+
|001|   Smith|     M| 40| DA|  4000|
|002|    Rose|     M| 35| DA|  3000|
|003|Williams|     M| 30| DE|  2500|
|004|    Anne|     F| 30| DE|  3000|
|005|    Mary|     F| 35| BE|  4000|
|006|   James|     M| 30| FE|  3500|
+---+--------+------+---+---+------+



### Load from file
By utilizing `DataFrameReader.csv("path")` or `format("csv").load("path")` methods, you can read a CSV file into a PySpark DataFrame. These methods accept a file path as their parameter. When using the format(“csv”) approach, you should specify data sources like *csv* or *org.apache.spark.sql.csv*.

Download the zipcode.csv file form the spark examples [repository](https://github.com/spark-examples/pyspark-examples/blob/master/resources/zipcodes.csv).

In [6]:
!curl -LO https://raw.githubusercontent.com/spark-examples/pyspark-examples/refs/heads/master/resources/zipcodes.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  3035  100  3035    0     0  57410      0 --:--:-- --:--:-- --:--:-- 58365


In [7]:
# read CSV File
df = spark.read.csv("./zipcodes.csv")
# or
# alternatively, you can use the format().load()
#df = spark.read.format("csv").load("./zipcodes.csv")
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)



If you have a header with column names on your input file, you need to explicitly specify True for header option using option("header",True) not mentioning this, the API treats header as a data record.

In [8]:
 # use header record for column names
df_wh = spark.read.option("header", True).option("inferSchema",True).csv("./zipcodes.csv")
df_wh.head()
df_wh.show(5)

+------------+-------+-----------+-------------------+-----+--------------+-----+------+-----+-----+-----+-----------+-------+--------------------+--------------------+-------------+---------------+-------------------+----------+-----+
|RecordNumber|Zipcode|ZipCodeType|               City|State|  LocationType|  Lat|  Long|Xaxis|Yaxis|Zaxis|WorldRegion|Country|        LocationText|            Location|Decommisioned|TaxReturnsFiled|EstimatedPopulation|TotalWages|Notes|
+------------+-------+-----------+-------------------+-----+--------------+-----+------+-----+-----+-----+-----------+-------+--------------------+--------------------+-------------+---------------+-------------------+----------+-----+
|           1|    704|   STANDARD|        PARC PARQUE|   PR|NOT ACCEPTABLE|17.96|-66.22| 0.38|-0.87|  0.3|         NA|     US|     Parc Parque, PR|NA-US-PR-PARC PARQUE|        false|           NULL|               NULL|      NULL| NULL|
|           2|    704|   STANDARD|PASEO COSTA DEL SUR|  

In [9]:
# stop the current spark session before connecting to standalone cluster
spark.stop()

# Additional Resources
- [Apache Spark Examples](https://spark.apache.org/examples.html)
- [Spark SQL, DataFrames and Datasets Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html)