In [22]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Row

In [23]:
import pandas as pd

### Initiate SparkSession

In [24]:
spark_session = SparkSession.builder \
    .appName("MySparkSession") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .enableHiveSupport() \
    .getOrCreate()

23/07/24 16:00:39 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### Exlore Spark Context

In [25]:
spark_context = spark_session.sparkContext
print(spark_context)

<SparkContext master=local[*] appName=MySparkSession>


In [26]:
print(spark_context.version)
print(spark_context.pythonVer)
print(spark_context.master)
print(str(spark_context.sparkHome))
print(str(spark_context.sparkUser()))
print(spark_context.appName)
print(spark_context.applicationId)
print(spark_context.defaultParallelism)
print(spark_context.defaultMinPartitions)

3.4.1
3.11
local[*]
None
manulabricole
MySparkSession
local-1690207239678
10
2


In [27]:
print("Spark Web UI: http://localhost:4040")

Spark Web UI: http://localhost:4040


### Play with RDD

#### Create RDD object

In [28]:
rdd1 = spark_session.sparkContext.parallelize([("a", 7), ("a", 2), ("b", 2)])
rdd2 = spark_session.sparkContext.parallelize([("value4", 2), ("value5", 1), ("value6", 1)])
rdd3 = spark_session.sparkContext.parallelize(range(100))
rdd4 = spark_session.sparkContext.parallelize([("key1", [1, 2, 3]), ("key2", [4, 5])])
rdd4 = spark_session.sparkContext.parallelize([("key1", [1, 2, 3]), ("key2", [4, 5])])

In [29]:
rdd1.map(lambda x: x+(x[1],x[0])).collect()

                                                                                

[('a', 7, 7, 'a'), ('a', 2, 2, 'a'), ('b', 2, 2, 'b')]

In [30]:
rdd5 = rdd2.flatMap(lambda x: x+(x[1],x[0]))
rdd5.collect()

['value4', 2, 2, 'value4', 'value5', 1, 1, 'value5', 'value6', 1, 1, 'value6']

#### Dataframe

In [31]:
data = [("Python", 100), ("Java", 200), ("Scala", 50)]
columns = ["language", "users_count"]

rdd = spark_session.sparkContext.parallelize(data)
rows = rdd.map(lambda x: Row(language=x[0], users_count=int(x[1])))

# Create DataFrame
df = spark_session.createDataFrame(rows)

# Print DataFrame
df.show()

+--------+-----------+
|language|users_count|
+--------+-----------+
|  Python|        100|
|    Java|        200|
|   Scala|         50|
+--------+-----------+



In [32]:
rdd.getNumPartitions()

10

In [33]:
rdd.countByKey()

defaultdict(int, {'Python': 1, 'Java': 1, 'Scala': 1})

### Decouverte des listes

In [48]:
l = [1, 2, 3, 4, 5]

In [49]:
rdd = spark_session.sparkContext.parallelize(l)

#### Lazy Evaluation

In [50]:
mapped_rdd = rdd.map(lambda x: x * 2)

In [51]:
filtered_rdd = mapped_rdd.filter(lambda x: x % 4 == 0) # Not executed yet because not called. Just a execution plan

In [52]:
filtered_rdd.collect() # Now spark will run the filter method

[4, 8]

In [53]:
ll = l*1000000
rdd_ll = spark_session.sparkContext.parallelize(ll)
mapped_rdd_ll = rdd_ll.map(lambda x: x * 2)
filtered_rdd_ll = mapped_rdd_ll.filter(lambda x: x % 4 == 0) # Not executed yet because not called. Just a execution plan

In [54]:
filtered_rdd_ll.collect()[:10] # Now spark will run the filter method

[4, 8, 4, 8, 4, 8, 4, 8, 4, 8]

In [55]:
# Stop the SparkSession when you're done
spark_session.stop()