In [None]:
! pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=95527b77e0d17f047ac7c727625f75d3ed82e14e4474a39f556152c3bbf67b09
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
# Practical 1: PySpark Data Reading and Display Example

# Import necessary PySpark modules
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("DataReadingExample").getOrCreate()

# Read data from a CSV file into a DataFrame
data_file = "songs.csv"
df = spark.read.csv(data_file, header=True, inferSchema=True)

# Show the first few rows of the DataFrame
df.show(3)

# Stop the SparkSession when you're done
spark.stop()

+----+------+--------+-------+
|song| title|  artist|  album|
+----+------+--------+-------+
|   1|Song 1|Artist 1|Album 1|
|   2|Song 2|Artist 2|Album 2|
|   3|Song 3|Artist 3|Album 3|
+----+------+--------+-------+
only showing top 3 rows



In [None]:
# Practical 2: Combining DataFrames with PySpark

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

# Create a SparkSession
spark = SparkSession.builder.appName("CombiningDataFramesExample").getOrCreate()

# Create two example DataFrames
data1 = [(1, "Alice"), (2, "Bob"), (3, "Charlie")]
data2 = [(4, "David"), (5, "Eve"), (6, "Frank")]
columns = ["id", "name"]
df1 = spark.createDataFrame(data1, columns)
df2 = spark.createDataFrame(data2, columns)

# Union two DataFrames (stack them on top of each other) and Show the combined DataFrame
print("Union")
combined_df = df1.union(df2)
combined_df.show()

# Join two DataFrames
data3 = [(1, "Math"), (2, "English"), (3, "Science")]
subjects_columns = ["id", "subject"]
df3 = spark.createDataFrame(data3, subjects_columns)

# Inner join the DataFrames on the 'id' column
print("Inner Join")
joined_df = df1.join(df3, "id", "inner")
joined_df.show()

# Stop the SparkSession when you're done
spark.stop()

Union
+---+-------+
| id|   name|
+---+-------+
|  1|  Alice|
|  2|    Bob|
|  3|Charlie|
|  4|  David|
|  5|    Eve|
|  6|  Frank|
+---+-------+

Inner Join
+---+-------+-------+
| id|   name|subject|
+---+-------+-------+
|  1|  Alice|   Math|
|  2|    Bob|English|
|  3|Charlie|Science|
+---+-------+-------+



In [None]:
# Pratical 3: Combining DataFrames with PySpark

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("MapReduceExample").getOrCreate()

# Create an RDD (Resilient Distributed Dataset) from a list of numbers
data = [1, 2, 3, 4, 5, 6, 7, 8, 9]
rdd = spark.sparkContext.parallelize(data)

# Collect the RDD and convert it to a list
collected_data = rdd.collect()
print("Collected Data:", collected_data)

# Filter out even numbers from the RDD
filtered_rdd = rdd.filter(lambda x: x % 2 == 0)
filtered_data = filtered_rdd.collect()
print("Filtered Data (Even numbers):", filtered_data)

# Map operation: Square each element of the RDD
mapped_rdd = rdd.map(lambda x: x * x)
mapped_data = mapped_rdd.collect()
print("Mapped Data (Squared):", mapped_data)

# Map-Reduce operation: Sum all the elements in the RDD
reduced_result = rdd.reduce(lambda x, y: x + y)
print("Reduced Result (Sum):", reduced_result)

# Stop the SparkSession
spark.stop()

Collected Data: [1, 2, 3, 4, 5, 6, 7, 8, 9]
Filtered Data (Even numbers): [2, 4, 6, 8]
Mapped Data (Squared): [1, 4, 9, 16, 25, 36, 49, 64, 81]
Reduced Result (Sum): 45


In [None]:
# Practical 4: Creating a spark session using the configuration and Dataframe creation
from pyspark.sql import SparkSession

# Configure Spark
spark = SparkSession.builder.appName("CustomConfigExample").config("spark.executor.memory", "2g").getOrCreate()


data = [(1, "Alice"), (2, "Bob"), (3, "Charlie")]
columns = ["id", "name"]
df = spark.createDataFrame(data, columns)

# Show the DataFrame
df.show()

# Stop the Spark session when done
spark.stop()

+---+-------+
| id|   name|
+---+-------+
|  1|  Alice|
|  2|    Bob|
|  3|Charlie|
+---+-------+



In [None]:
# Practical 5: PySpark Word Count and Data Manipulation Example

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("WordCountExample").getOrCreate()

# Create an RDD from a text file (replace 'your_text_file.txt' with your file)
text_file = "textfile.txt"
rdd = spark.sparkContext.textFile(text_file)

# Split the lines into words, perform word count, and collect the results
word_counts = rdd.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).collect()

# Display the word count results
for word, count in word_counts:
    print(f"{word}: {count}")

# Stop the SparkSession
spark.stop()

name: 1
is: 1
prachi: 2
i: 1
am: 1
my: 1


In [None]:
# Practical 7: Creating a temporary view of DataFrame to use SQL Query with Spark Session

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQLExample").getOrCreate()

# Read data into a DataFrame (Replace 'your_data_file.csv' with your file)
data_file = "songs.csv"
df = spark.read.csv(data_file, header=True, inferSchema=True)

# Register the DataFrame as a temporary view
df.createOrReplaceTempView("my_temp_view")

# Run SQL queries against the temporary view
result = spark.sql("SELECT * FROM my_temp_view WHERE song = 4")
result.show()

# Stop the SparkSession
spark.stop()

+----+------+--------+-------+
|song| title|  artist|  album|
+----+------+--------+-------+
|   4|Song 4|Artist 4|Album 4|
+----+------+--------+-------+



In [None]:
# Practical 8: Creating user defined function with Spark Session

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Create a SparkSession
spark = SparkSession.builder.appName("UDFExample").getOrCreate()

# Sample DataFrame
data = [("Alice",), ("Bob",), ("Charlie",)]
columns = ["name"]
df = spark.createDataFrame(data, columns)

# Define a UDF
def greet(name):
    return f"Hello, {name}!"

# Register the UDF
greet_udf = udf(greet, StringType())

# Apply the UDF to the DataFrame
df_with_greeting = df.withColumn("greeting", greet_udf(df["name"]))

# Show the result
df_with_greeting.show()

# Stop the SparkSession
spark.stop()

+-------+---------------+
|   name|       greeting|
+-------+---------------+
|  Alice|  Hello, Alice!|
|    Bob|    Hello, Bob!|
|Charlie|Hello, Charlie!|
+-------+---------------+

