<a href="https://colab.research.google.com/github/AI2C-mlindholm/devops-recitation-1/blob/main/PySpark_Recitation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark

In [None]:
import pyspark as ps
import pyspark.sql.functions as f
from pyspark.sql.types import *
from pyspark.sql import SparkSession

In [None]:
from pyspark import SparkContext

# Create a SparkContext object
sc = SparkContext()

# RDD Functions

### Map

In [None]:
data = sc.parallelize(["1", "2", "3", "4", "5"])

In [None]:
# @title
# Without lambda
def parse_to_int(x):
    return int(x)

parsed_data = data.map(parse_to_int)

In [None]:
# @title
parsed_data = data.map(lambda x: float(x))

In [None]:
parsed_data.collect()


### Reduce

In [None]:
sales_data = sc.parallelize([("Apple", 50), ("Banana", 30), ("Apple", 20), ("Orange", 40), ("Banana", 25)])

In [None]:
# @title
total_sales = sales_data.reduceByKey(lambda x, y: x + y)
total_sales.collect()

[('Apple', 70), ('Orange', 40), ('Banana', 55)]

### Extracting Top Elements

In [None]:
word_counts = sc.parallelize([("apple", 10), ("banana", 5), ("orange", 8), ("apple", 15), ("banana", 12)])

In [None]:
# @title
top_words = word_counts.top(2, key=lambda x: x[1])
top_words

### Filter

In [None]:
# @title
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
filtered_rdd = rdd.filter(lambda num: num % 3 != 0)
filtered_rdd.collect()

These examples demonstrate the versatility of PySpark's RDD functions in various data processing scenarios.

# Dataframe Functions

In [None]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("example_app") \
    .getOrCreate()

In [None]:
# Define the schema for the dataset
schema = StructType([
    StructField("category", StringType(), True),
    StructField("product", StringType(), True),
    StructField("price", IntegerType(), True)
])

# Define the data
data = [
    ("Electronics", "Laptop", 1200),
    ("Electronics", "Smartphone", 800),
    ("Clothing", "T-Shirt", 20),
    ("Clothing", "Jeans", 50),
    ("Books", "Fiction", 15),
    ("Books", "Non-Fiction", 25)
]

# Create a DataFrame from the data and schema
raw_df = spark.createDataFrame(data, schema=schema)

# Show the DataFrame
raw_df.show()

### Grouping and Aggregation

In [None]:
# @title
avg_prices_df = raw_df.groupBy('category').agg(
    f.avg('product').alias('avg_price')
)
avg_prices_df.show()

### Sorting






In [None]:
# @title
#sorted_df = raw_df.orderBy('price', ascending=True)
sorted_df = raw_df.sort(raw_df['price'].asc())
sorted_df.show()

### Selection

In [None]:
# @title
selected_df = raw_df.select('category', 'product')
selected_df.show()

# Other Functions

### Split

In [None]:
data = sc.parallelize(["Hello World", "Python Programming", "Big Data"])

In [None]:
# @title
split_data = data.map(lambda x: x.split(' '))
split_data.collect()

### flatMap

In [None]:
# @title
split_data = data.flatMap(lambda x: x.split(' '))
split_data.collect()

### partitionBy

In [None]:
# @title
data = sc.parallelize(["apple", "banana", "orange", "car", "boat", "bike", "sun", "moon", "star"]) \
          .map(lambda x: (x[0], x))

In [None]:
partitioned_data = data.partitionBy(3)
partitioned_data.glom().collect()