<!-- use this command in cmd - spark-shell -->

In [1]:
import findspark
findspark.init()

In [2]:
import pyspark


In [3]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("MatrixMultiplication") \
    .getOrCreate()

# Sample matrices
matrix1 = [
    (0, 0, 2),
    (0, 1, 3),
    (1, 0, 4),
    (1, 1, 5)
]

matrix2 = [
    (0, 0, 6),
    (0, 1, 7),
    (1, 0, 8),
    (1, 1, 9)
]

# Create RDDs from the matrices
matrix1_rdd = spark.sparkContext.parallelize(matrix1)
matrix2_rdd = spark.sparkContext.parallelize(matrix2)

# Perform matrix multiplication using map-reduce
result_rdd = matrix1_rdd.flatMap(lambda x: [((x[0], y[1]), x[2] * y[2]) for y in matrix2 if x[1] == y[0]]). \
    reduceByKey(lambda x, y: x + y)


# Convert RDD to DataFrame
result_df = spark.createDataFrame(result_rdd.map(lambda x: (x[0][0], x[0][1], x[1])), ["row", "col", "result"])

# Display the result
result_df.show()

# Stop SparkSession
spark.stop()


24/04/18 13:24:48 WARN Utils: Your hostname, pl1-HP-280-Pro-G6-Microtower-PC resolves to a loopback address: 127.0.1.1, but we couldn't find any external IP address!
24/04/18 13:24:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/18 13:24:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/04/18 13:24:49 WARN MacAddressUtil: Failed to find a usable hardware address from the network interfaces; using random bytes: 50:b3:4f:1f:a9:62:e0:67
24/04/18 13:24:49 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/04/18 13:24:49 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
                                                                                

+---+---+------+
|row|col|result|
+---+---+------+
|  0|  0|    36|
|  1|  0|    64|
|  1|  1|    73|
|  0|  1|    41|
+---+---+------+



In [4]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("StudentGrades") \
    .getOrCreate()

# Sample student scores
scores = [
    ("Alice", {"Math": 85, "Science": 90, "English": 80}),
    ("Bob", {"Math": 70, "Science": 75, "English": 85}),
    ("Charlie", {"Math": 60, "Science": 65, "English": 70}),
    ("David", {"Math": 90, "Science": 95, "English": 85}),
    ("Eve", {"Math": 75, "Science": 80, "English": 75})
]

# Create RDD from the scores
scores_rdd = spark.sparkContext.parallelize(scores)

# Define the grading scheme (example)
grading_scheme = {
    "A": (80, 100),
    "B": (60, 79),
    "C": (40, 59),
    "D": (0, 39)
}

# Function to compute grades for a given score
def compute_grade(score):
    for grade, (lower_bound, upper_bound) in grading_scheme.items():
        if lower_bound <= score <= upper_bound:
            return grade
    return "F"

# Map operation to compute grades for each student
grades_rdd = scores_rdd.map(lambda x: (x[0], {subject: compute_grade(score) for subject, score in x[1].items()}))

# Convert RDD to DataFrame
grades_df = spark.createDataFrame(grades_rdd.flatMap(lambda x: [(x[0], subject, grade) for subject, grade in x[1].items()]), ["Student", "Subject", "Grade"])

# Display the result
grades_df.show()

# Stop SparkSession
spark.stop()


24/04/18 13:26:15 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/04/18 13:26:15 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


+-------+-------+-----+
|Student|Subject|Grade|
+-------+-------+-----+
|  Alice|   Math|    A|
|  Alice|Science|    A|
|  Alice|English|    A|
|    Bob|   Math|    B|
|    Bob|Science|    B|
|    Bob|English|    A|
|Charlie|   Math|    B|
|Charlie|Science|    B|
|Charlie|English|    B|
|  David|   Math|    A|
|  David|Science|    A|
|  David|English|    A|
|    Eve|   Math|    B|
|    Eve|Science|    A|
|    Eve|English|    B|
+-------+-------+-----+



In [5]:
from pyspark.sql import SparkSession
import seaborn as sns

# Create a SparkSession
spark = SparkSession.builder \
    .appName("TitanicAnalysis") \
    .getOrCreate()

# Load the Titanic dataset
titanic_df = spark.createDataFrame(sns.load_dataset("titanic"))

titanic_df = titanic_df.fillna({'Age': 0})

# Filter data for male passengers who died and remove null values from Age column
male_deceased = titanic_df.filter((titanic_df["Sex"] == "male") & (titanic_df["Survived"] == 0) & titanic_df["Age"].isNotNull())

# Check if there are any male passengers who died
male_deceased_count = male_deceased.count()

if male_deceased_count > 0:
    # Calculate the average age of male passengers who died
# Calculate the average age of male passengers who died
    male_deceased_age_avg = male_deceased.agg({"Age": "avg"}).collect()[0][0]
    print("Number of male passengers who died:", male_deceased_count)
    print("Average age of male passengers who died:", male_deceased_age_avg)
else:
    print("No male passengers found who died in the dataset.")
    
female_deceased_by_class = titanic_df.filter((titanic_df["Sex"] == "female") & (titanic_df["Survived"] == 0)).groupBy("Pclass").count()

# Display results
print("Number of deceased passengers in each class among females:")
female_deceased_by_class.show()
# Stop SparkSession
spark.stop()


24/04/18 11:25:40 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
                                                                                

Number of male passengers who died: 468
Average age of male passengers who died: 24.321581196581196
Number of deceased passengers in each class among females:
+------+-----+
|Pclass|count|
+------+-----+
|     3|   72|
|     2|    6|
|     1|    3|
+------+-----+

