In [0]:
# Day 1_RDD and Dataframe

In [0]:
# Import Spark libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.rdd import RDD

In [0]:
# Create SparkSession 
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate() 

In [0]:
##CREATING RDD

# Create RDD from parallelize    
dataList = [("Java", 20000), ("Python", 100000), ("Scala", 3000)]
rdd_1=spark.sparkContext.parallelize(dataList)
print(rdd_1)

ParallelCollectionRDD[171] at readRDDFromInputStream at PythonRDD.scala:413


In [0]:
##FILES
#dbfs:/FileStore/shared_uploads/27prachisingh@gmail.com/creditcard-2.csv
#dbfs:/FileStore/shared_uploads/27prachisingh@gmail.com/example_1.json
#dbfs:/FileStore/shared_uploads/27prachisingh@gmail.com/textfile_test.txt
#dbfs:/FileStore/shared_uploads/27prachisingh@gmail.com/csvfile.csv
#df1 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/27prachisingh@gmail.com/csvfile.csv")
#dbfs:/FileStore/shared_uploads/27prachisingh@gmail.com/sampletxt.txt
#dbfs:/FileStore/shared_uploads/27prachisingh@gmail.com/blocks_0012738509_0012739509__3_.parquet

In [0]:
# Using txt file
rdd_2 = spark.sparkContext.textFile("dbfs:/FileStore/shared_uploads/27prachisingh@gmail.com/sampletxt.txt")
#print(rdd_2)

In [0]:
#Conversion of rdd to df
#rdd to df()
df_converted1 = rdd_1.toDF()
df_converted1.printSchema()
df_converted1.show(truncate=False)

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)

+------+------+
|_1    |_2    |
+------+------+
|Java  |20000 |
|Python|100000|
|Scala |3000  |
+------+------+



In [0]:
#rdd to df()
col_name=["col1","col2"]
df_converted2 = rdd_1.toDF(col_name)
df_converted2.printSchema()
df_converted2.show(truncate=False)

root
 |-- col1: string (nullable = true)
 |-- col2: long (nullable = true)

+------+------+
|col1  |col2  |
+------+------+
|Java  |20000 |
|Python|100000|
|Scala |3000  |
+------+------+



In [0]:
%scala
%convert rdd to dataset
val ds = spark.createDataset(rdd_1)

In [0]:
#check if the data given is dataframe or rdd
print(isinstance(rdd_1, DataFrame))
print(isinstance(rdd_1, RDD))
#printing the type of data object
print(type(rdd_1))
print(type(df_converted2))

In [0]:
#createDataFrame() function
deptDF = spark.createDataFrame(rdd_1, schema = col_name)
deptDF.printSchema()
deptDF.show(truncate=False)

In [0]:
#Using createDataFrame() with StructType schema
colSchema = StructType([       
    StructField('col1', StringType(), True),
    StructField('col2', IntegerType(), True)
])

df_converted3 = spark.createDataFrame(rdd_1, schema = colSchema)
df_converted3.printSchema()
df_converted3.show(truncate=False)

In [0]:
#Creating DataFrame

#1. Create DataFrame from RDD

#2. Create DataFrame from List Collection

#2.A Using createDataFrame() from SparkSession
columns = ["language","users_count"]
data = [("Java", 20000), ("Python", 100000), ("Scala", 3000)]
dfFromList = spark.createDataFrame(data).toDF(*columns)
dfFromList.printSchema()
dfFromList.show()

In [0]:
#2.B  Using createDataFrame() with the Row type
rowData = map(lambda x: Row(*x), data) #conversion of 'data' object from the list to the list of row
dfFromList1 = spark.createDataFrame(rowData,columns)
dfFromList1.printSchema()
dfFromList1.show()

In [0]:
#2.C  Create DataFrame with schema
data2 = [("James","Smith","36636",3000),
    ("Michael","","40288",4000),
    ("Robert","Williams","42114",4000),
    ("Maria","Jones","39192",4000),
    ("Jen","Brown","",-1)
  ]

data_schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
dfFromList2 = spark.createDataFrame(data=data2,schema=data_schema)
dfFromList2.printSchema()
dfFromList2.show(truncate=False)

In [0]:
#3. Create DataFrame from Data sources

#3.A Creating DataFrame from CSV
df_csv = spark.read.csv("dbfs:/FileStore/shared_uploads/27prachisingh@gmail.com/csvfile.csv")
df_csv.printSchema()
df_csv.show()

In [0]:
#3.B Creating from text (TXT) file
df_txt = spark.read.text("dbfs:/FileStore/shared_uploads/27prachisingh@gmail.com/textfile_test.txt")
df_txt.printSchema()
df_txt.show()

In [0]:
df_JSON = spark.read.json("dbfs:/FileStore/shared_uploads/27prachisingh@gmail.com/example_1.json")
df_JSON.printSchema()
#df_JSON.show()

In [0]:
parDF=spark.read.parquet("dbfs:/FileStore/shared_uploads/27prachisingh@gmail.com/blocks_0012738509_0012739509__3_.parquet")
parDF.show()

In [0]:
#Map()
#create an RDD from the list
spark = SparkSession.builder.master("local[1]") \
    .appName("Spark_Practice").getOrCreate()

data_rdd = ["Project","Gutenberg’s","Alice’s","Adventures",
"in","Wonderland","Project","Gutenberg’s","Adventures",
"in","Wonderland","Project","Gutenberg’s"]

rdd_list=spark.sparkContext.parallelize(data_rdd)
#adding a new element with value 1 for each element
rdd2=rdd_list.map(lambda x: (x,1))
for element in rdd2.collect():
    print(element)

In [0]:
#transformation on dataframe
data_df = [('James','Smith','M',30),
  ('Anna','Rose','F',41),
  ('Robert','Williams','M',62), 
]

columns_df = ["firstname","lastname","gender","salary"]
df_1 = spark.createDataFrame(data=data_df, schema = columns_df)
df_1.show()
rdd_df_1=df_1.rdd.map(lambda x: 
    (x[0]+","+x[1],x[2],x[3]*2)
    )  
df_2=rdd_df_1.toDF(["name","gender","new_salary"]   )
df_2.show()
# Referring Column Names
rdd_df_2_1=df_1.rdd.map(lambda x: 
    (x["firstname"]+","+x["lastname"],x["gender"],x["salary"]*2)
    ) 
df_3=rdd_df_2_1.toDF(["name","gender","new_salary"]   )
df_3.show()
#Another alternative
# Referring Column Names
rdd_df_2_2=df_1.rdd.map(lambda x: 
    (x.firstname+","+x.lastname,x.gender,x.salary*2)
    ) 
df_4=rdd_df_2_2.toDF(["name","gender","new_salary"]   )
df_4.show()
#Using functions
def func1(x):
    firstName=x.firstname
    lastName=x.lastname
    name=firstName+","+lastName
    gender=x.gender.lower()
    salary=x.salary*2
    return (name,gender,salary)

rdd_3=df_1.rdd.map(lambda x: func1(x))
df_5=rdd_3.toDF(["name","gender","new_salary"]   )
df_5.show()