In [1]:
pip install pyspark




SparkSession:
SparkSession is a newer, unified entry point introduced in Spark 2.x that combines both SparkContext and SQLContext (which were previously used separately).
It provides a higher-level API for working with Spark, particularly for handling DataFrame and SQL operations.
SparkSession is designed to work with structured data (DataFrames and Datasets), which is a more abstract and optimized API compared to RDDs.
It internally manages a SparkContext instance, so you can access it via spark.sparkContext.

In [2]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder.master("local").appName("Learning PySpark").getOrCreate()


In [3]:
data= [("Alice",1),("bob",2),("charlie",3)]
columns= ["Name","Value"]
df=spark.createDataFrame(data,columns)
df.show()

+-------+-----+
|   Name|Value|
+-------+-----+
|  Alice|    1|
|    bob|    2|
|charlie|    3|
+-------+-----+



In [4]:
df.filter(df["value"]>1).show()

+-------+-----+
|   Name|Value|
+-------+-----+
|    bob|    2|
|charlie|    3|
+-------+-----+



In [5]:
df.groupBy("value").count().show()

+-----+-----+
|value|count|
+-----+-----+
|    1|    1|
|    3|    1|
|    2|    1|
+-----+-----+



In [8]:
df = df.withColumn("Double", df["value"] * 2)
df.show()

+-------+-----+------+
|   Name|Value|Double|
+-------+-----+------+
|  Alice|    1|     2|
|    bob|    2|     4|
|charlie|    3|     6|
+-------+-----+------+



You can join two DataFrames based on common columns

In [9]:
# Create another DataFrame
data2 = [("Alice", "NY"), ("Bob", "LA")]
columns2 = ["Name", "City"]

df2 = spark.createDataFrame(data2, columns2)

joined_df =df.join(df2,"Name")
joined_df.show()

+-----+-----+------+----+
| Name|Value|Double|City|
+-----+-----+------+----+
|Alice|    1|     2|  NY|
+-----+-----+------+----+





1.   Parquet files include schema information (metadata) along with the data, which defines the structure of the data (e.g., field names, data types). This makes it self-descriptive, allowing for easier integration and querying across different tools.
2.   Parquet stores data in a columnar format, which means that data is stored by column rather than by row. This allows for better compression and faster read performance, especially when you only need to access a subset of the columns in a large dataset.



In [10]:
# Save DataFrame as Parquet
df.write.parquet("output.parquet")

Basic SQL Operations..

In [12]:
df.createOrReplaceGlobalTempView("people")


In [17]:
# Execute SQL query
result = spark.sql("SELECT Name, Value FROM global_temp.people WHERE Value > 1")
result.show()

+-------+-----+
|   Name|Value|
+-------+-----+
|    bob|    2|
|charlie|    3|
+-------+-----+



RDD is a fundamental data structure in Spark and is the backbone of Spark's distributed computing capabilities. RDDs allow you to perform distributed data processing and are fault-tolerant, meaning that if any partition of the RDD is lost due to a node failure, it can be recomputed.

In [18]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("RDD Basics").getOrCreate()

# Access the SparkContext from the SparkSession
sc = spark.sparkContext



The sc.parallelize function in Apache Spark is used to distribute a local collection (e.g., a list or an array) across multiple nodes in a cluster, creating an RDD (Resilient Distributed Dataset). It enables you to perform parallel operations on data in a distributed manner.

In [19]:
data= [1,2,3,4,5]
rdd = sc.parallelize(data)


 Collect the RDD to check its contents

In [21]:
rdd.collect()

[1, 2, 3, 4, 5]


In [28]:
rdd_file = sc.textFile("output.parquet")
rdd_file.collect()

['PAR1\x15\x00\x15B\x15F\x15Ǹ��\x02\x1c\x15\x06\x15\x00\x15\x06\x15\x08\x00\x00!�\x02\x00\x00\x00\x03\x07\x05\x00\x00\x00Alice\x03\x00\x00\x00bob\x07\x00\x00\x00charlie\x15\x00\x15<\x156\x15����\x05\x1c\x15\x06\x15\x00\x15\x06\x15\x08\x00\x00\x1e\x1c\x02\x00\x00\x00\x03\x07\x01\x00\t\x01\x01\x0e,\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x15\x00\x15<\x15:\x15���8\x1c\x15\x06\x15\x00\x15\x06\x15\x08\x00\x00\x1e\x14\x02\x00\x00\x00\x03\x07\x01\x06\x01\x01<\x04\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x00\x00\x19\x11\x02\x19\x18\x05Alice\x19\x18\x07charlie\x15\x02\x19\x16\x00\x00\x19\x11\x02\x19\x18\x08\x01\x00\x00\x00\x00\x00\x00\x00\x19\x18\x08\x03\x00\x00\x00\x00\x00\x00\x00\x15\x02\x19\x16\x00\x00\x19\x11\x02\x19\x18\x08\x02\x00\x00\x00\x00\x00\x00\x00\x19\x18\x08\x06\x00\x00\x00\x00\x00\x00\x00\x15\x02\x19\x16\x00\x00\x19\x1c\x16\x08\x15t\x16\x00\x00\x00\x19\x1c\x16|\x15d\x16\x00\x00\x00\x19\x1c\x16�\x01\x15f\x16\x00\x00\x00\x15\x02\x19LH\x0cspark_schema\x15\x06\x00\

RDD Transformations
Transformations are operations that create a new RDD by applying a function to each element of the existing RDD. They are lazy, meaning the computation is not performed until an action is called.

In [29]:
rdd_map = rdd.map(lambda x: x*2)
rdd_map.collect()

[2, 4, 6, 8, 10]

In [30]:
rdd_filter=rdd.filter(lambda x: x%2==0)
rdd_filter.collect()

[2, 4]

flatMap() – Similar to map(), but the output can be a list of items, effectively flattening the result:


In [31]:
# Split each element into multiple words
rdd_flatmap = sc.parallelize(["hello world", "apache spark"]).flatMap(lambda x: x.split(" "))

# Collect and show the result
print(rdd_flatmap.collect())


['hello', 'world', 'apache', 'spark']


reduce() – Reduces the RDD to a single value by applying a function to combine elements:

In [32]:
# Sum the elements of the RDD
sum_result = rdd.reduce(lambda x, y: x + y)

# Show the result
print(sum_result)

15


RDD Actions
Actions are operations that trigger the execution of the transformations and return a value or export data.

In [34]:
# Collect the RDD data
collected_data = rdd.collect()



In [35]:
# Show the result
print(collected_data)

[1, 2, 3, 4, 5]


In [36]:
# Count the number of elements in the RDD
count_result = rdd.count()

# Show the result
print(count_result)

5


In [38]:
# Get the first element in the RDD
first_element = rdd.first()

# Show the result
print(first_element)

1


In [37]:
# Get the first 3 elements in the RDD
first_three = rdd.take(3)

# Show the result
print(first_three)

[1, 2, 3]


RDD Persistence
You can cache or persist an RDD to store it in memory for efficient reuse. This is useful if you plan to perform multiple actions on the same RDD.

python
Copy code


In [39]:
# Cache the RDD to memory
rdd.cache()

# Now, any subsequent action on rdd will be faster due to caching


ParallelCollectionRDD[38] at readRDDFromFile at PythonRDD.scala:289

groupByKey() groups all values for each key.

reduceByKey() performs a reduction operation on the values for each key (  **efficient**).

In [44]:
rdd= sc.parallelize([("a",1),("b",1),("a",1),("b",1)])
grouped_rdd = rdd.groupByKey()
grouped_rdd.collect()

[('a', <pyspark.resultiterable.ResultIterable at 0x789f59242200>),
 ('b', <pyspark.resultiterable.ResultIterable at 0x789f59243a60>)]

In [45]:
# Using reduceByKey: Aggregates the values for each key (more efficient)
reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)
print(reduced_rdd.collect())

[('a', 2), ('b', 2)]


groupByKey() will return a list of values for each key, whereas

 reduceByKey() will combine the values into a single value per key, based on the function provided.

Example: Repartitioning and Coalescing RDDs
repartition(): Increases the number of partitions.
coalesce(): Reduces the number of partitions (more efficient when reducing).

Why Partitioning Matters: When data is distributed across partitions, operations like map() and reduce() can be executed in parallel across the partitions, improving performance.


However, too many partitions may result in excessive overhead, while too few may not take advantage of parallelism