# PySpark

See installation instructions [here](https://github.com/boisalai/jupyter-notebooks/blob/main/pyspark.md).

## Exemple 1: TLC Trip Record Data

In [8]:
import pyspark
from pyspark.sql import SparkSession

In [9]:
!wget https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv

--2023-05-05 08:52:30--  https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
Résolution de s3.amazonaws.com (s3.amazonaws.com)… 52.216.208.112, 52.216.217.200, 52.216.139.117, ...
Connexion à s3.amazonaws.com (s3.amazonaws.com)|52.216.208.112|:443… connecté.
requête HTTP transmise, en attente de la réponse… 200 OK
Taille : 12322 (12K) [application/octet-stream]
Sauvegarde en : « taxi+_zone_lookup.csv.3 »


2023-05-05 08:52:31 (23,3 MB/s) — « taxi+_zone_lookup.csv.3 » sauvegardé [12322/12322]



In [10]:
# We now need to instantiate a Spark session, an object that we use to interact with Spark.
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

df = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

df.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

## Exemple 2: 

In [1]:
import findspark
findspark.init()
findspark.find()

'/opt/homebrew/Cellar/apache-spark/3.4.0/libexec'

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/05 08:42:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

23/05/05 08:42:46 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [5]:
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
columns = ["language", "users_count"]

In [6]:
# Create DataFrame
df = spark.createDataFrame(data).toDF(*columns)

In [7]:
# Print DataFrame
df.show()

[Stage 0:>                                                          (0 + 1) / 1]

+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+



                                                                                

In [12]:
!echo $PYSPARK_PYTHON
!echo $PYSPARK_DRIVER_PYTHON
!echo $PYTHONPATH



/opt/homebrew/Cellar/apache-spark/3.4.0/libexec/python/lib/py4j-0.10.9.7-src.zip:/opt/homebrew/Cellar/apache-spark/3.4.0/libexec/python/:/opt/homebrew/Cellar/python@3.10/3.10.11


In [18]:
!python3 -V

Python 3.11.3


In [13]:
!export PYSPARK_PYTHON=python3
!export PYSPARK_DRIVER_PYTHON=python3

In [15]:
!export PYSPARK_PYTHON=/opt/homebrew/Cellar/python@3.11
!export PYSPARK_DRIVER_PYTHON=/opt/homebrew/Cellar/python@3.11

In [22]:
!export PYSPARK_DRIVER_PYTHON=jupyter
!export PYSPARK_DRIVER_PYTHON_OPTS=’lab’

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" # path to java jdk
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7" # path to spark home
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
# https://wenkangwei.github.io/2020/10/14/PySpark-0-Installation/

# Exemple 3

In [None]:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession

def main(spark):
    df = spark.createDataFrame(
        [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
        ("id", "v"))

    @pandas_udf("double")
    def mean_udf(v: pd.Series) -> float:
        return v.mean()

    print(df.groupby("id").agg(mean_udf(df['v'])).collect())

if __name__ == "__main__":
    main(SparkSession.builder.getOrCreate())

In [None]:
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
spark = SparkSession.builder.config(
    "spark.archives",  # 'spark.yarn.dist.archives' in YARN.
    "pyspark_venv.tar.gz#environment").getOrCreate()
main(spark)