In [0]:
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.appName("SparkCluster").getOrCreate()

In [0]:
# %sh
# cp course-master-big-data-with-pyspark-and-aws/Code/03-Spark\ DFs/StudentData.csv ./

In [0]:
%sh
ls

In [0]:
df = spark.read \
        .option("inferSchema",True) \
        .option("header",True) \
        .csv("/FileStore/tables/StudentData.csv")

In [0]:
# %sh
# ls /dbfs/FileStore/tables/

In [0]:
df.show()

In [0]:
display(df)

In [0]:
df.printSchema()

In [0]:
df1 = spark.read \
            .options(inferSchema = 'True', header = 'True', delimiter = ',') \
            .csv("/FileStore/tables/StudentData.csv")
df1.printSchema()

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

schema = StructType([
    StructField("age", IntegerType(), True),
    StructField("gender", StringType(), True),
    StructField("name", StringType(), True),
    StructField("course", StringType(), True),
    StructField("roll", StringType(), True),
    StructField("marks", IntegerType(), True),
    StructField("email", StringType(), True)
])



In [0]:
df_schema = spark.read \
            .options(header = 'True') \
            .schema(schema) \
            .csv("/FileStore/tables/StudentData.csv")

In [0]:
df_schema.printSchema()

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkClustering").getOrCreate()

In [0]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("SparkClustering")
sc = SparkContext.getOrCreate(conf=conf)

rdd = sc.textFile("/FileStore/tables/StudentData.csv")
headers = rdd.first()
rdd.filter(lambda x : x!= headers).map(lambda x : x.split(",")).collect()

In [0]:
rdd = rdd.filter(lambda x : x != headers).map(lambda x : x.split(","))
rdd = rdd.map(lambda x: [int(x[0]), x[1], x[2], x[3], x[4], int(x[5]), x[6]])

rdd.collect()

In [0]:
columns = headers.split(",")
df_rdd = rdd.toDF(columns)

df_rdd.show()

In [0]:
df_rdd.printSchema()

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

schema = StructType([
            StructField("age", IntegerType(), True),
            StructField("gender", StringType(), True),
            StructField("name", StringType(), True),
            StructField("course",StringType(), True),
            StructField("roll", StringType(), True),
            StructField("marks", IntegerType(), True),
            StructField("email", StringType(), True)
])

In [0]:
df_rdd = spark.createDataFrame(rdd, schema)


In [0]:
df_rdd.show()

In [0]:
df_rdd.printSchema()

In [0]:
df.select("gender","name").show()

In [0]:
df.select(df.marks,df.email).show()

In [0]:
from pyspark.sql.functions import col

df.select(col("marks"), col("name")).show()

In [0]:
df.select("*").show()

In [0]:
df.columns

In [0]:
# df.select(df.columns[:3]).show()

# df.select(df.columns[2]).show()

df.select(df.columns).show()

In [0]:
df.printSchema()

In [0]:
df = df.withColumn("roll",col("roll").cast("string"))
df.printSchema()

In [0]:
df.withColumn("maks",col("marks")+10).show()

In [0]:
df.withColumn("Updated marks", col("marks") - 10).show()

In [0]:
from pyspark.sql.functions import col, lit

df.withColumn("Country", lit("USA")).show()

In [0]:
df.withColumn("marks", col("marks") - 10) \
    .withColumn("Updated Marks", col("marks") - 10) \
        .withColumn("Country", lit("USA")).show()

In [0]:
df.withColumnRenamed("gender", "sex") \
    .withColumnRenamed("roll", "roll_number").show()

In [0]:
df.select(col("roll").alias("roll_number")).show()

In [0]:
df.filter(df.course == 'DB').show()

In [0]:
df.filter( (col("course") == 'DB') & (df.marks > 50) ).show()

In [0]:
courses = ['DB','DSA','OOP']

df.filter(df.course.isin(courses)).show()

In [0]:
df.filter(df.course.startswith('M')).show()

In [0]:
df.filter(df.course.endswith('B')).show()

In [0]:
df.filter(df.name.contains('se')).show()

In [0]:
df.filter(df.course.like("%D%")).show()

## Quiz

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DoubleType
from pyspark.sql.functions import lit, col, round

In [0]:
spark = SparkSession.builder.appName("Spark Typo").getOrCreate()

quiz_df = spark.read \
                .options(inferSchema = 'True',
                         header = 'True') \
                .csv("/FileStore/tables/StudentData.csv")

In [0]:
quiz_df.printSchema()

In [0]:
quiz_df = quiz_df.withColumn("roll", col("roll").cast(StringType()))
quiz_df.printSchema()

In [0]:
quiz_df.select(df.columns).show()

In [0]:
total_marks = 120

quiz_df = quiz_df.withColumn("Avg marks", col("marks")/total_marks * 100)
quiz_df = quiz_df.withColumn("Avg marks", round(col("Avg marks"), 0))

In [0]:
quiz_df.columns

In [0]:
quiz_df = quiz_df.withColumnRenamed("Avg marks","Avg_marks")

In [0]:
quiz_df.show()

In [0]:
df_80 = quiz_df.filter( (quiz_df.Avg_marks > 80) & (quiz_df.course == 'OOP'))

In [0]:
df_80.show()

In [0]:
df_60 = quiz_df.filter( (quiz_df.Avg_marks > 60) & (quiz_df.course == 'Cloud'))
df_60.show()

In [0]:
quiz_df.count()

In [0]:
df.filter(df.course == 'DB').count()

In [0]:
df.select("course").distinct().show()

In [0]:
df.dropDuplicates(["gender","course"]).show()

In [0]:
df.groupBy("course").count().show()

In [0]:
df.select("age","gender","course").distinct().show()

In [0]:
df.sort("age", "marks", ascending = False).show()

In [0]:
df.sort(df.marks.desc(), df.age.asc()).show()