Create a DataFrame from an RDD through reflection method

Import findspark and initiate.
Then import pyspark

In [None]:
import findspark
findspark.init('/usr/local/spark')
import pyspark

Start SparkSession

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Spark SQL example").getOrCreate()

In [None]:
#import collections

In [None]:
#collections.namedtuple

In [None]:
from pyspark.sql import Row

In [None]:
r1 = Row(name="John",Age=28)
print(r1.name)
print(r1.Age)
print(r1)

In [None]:
sc = spark.sparkContext

Create an RDD from the structured text file

In [None]:
clines = sc.textFile("customers.txt")

Transform this RDD of text lines to an RDD of "Row"s with each Row having the 5 fields of the text input lines.

In [None]:
cfields = clines.map(lambda l: l.split("\t"))

In [None]:
cfields.take(2)

In [None]:
customers = cfields.map(lambda p: Row(cid=p[0],cname=p[1],ccity=p[2],cstate=p[3],czip=p[4]))

In [None]:
customers.take(2)

In [None]:
customers

Infer the schema, and register the DataFrame as a table (temp view).

In [None]:
customerDF = spark.createDataFrame(customers)

In [None]:
customerDF.printSchema()

In [None]:
customerDF.show(5)

In [None]:
customerDF.head()

In [None]:
from pyspark.sql.functions import monotonically_increasing_id

In [None]:
df_indexed=customerDF.withColumn("index", monotonically_increasing_id())

In [None]:
df_indexed.printSchema()

In [None]:
df_indexed.show(3)

In [None]:
customerDF=df_indexed.filter(df_indexed.index!=0).drop("index")

In [None]:
customerDF.show(2)

In [None]:
customerDF.select("cname").show(5)

In [None]:
customerDF.select(customerDF['cname'], customerDF['ccity']).show(2)

In [None]:
customerDF.filter(customerDF['cstate'] == 'CA').select(customerDF['cname'],customerDF['cstate']).show(5)

In [None]:
customerDF.groupBy("cstate").count().show()

In [None]:
customerDF.show(20)

Create the temp view to be able to run SQL queries on the DataFrame

In [None]:
customerDF.createOrReplaceTempView("customers")

In [None]:
cStateCount50 = spark.sql("SELECT cstate, count(*) as sttcount FROM customers GROUP BY cstate HAVING sttcount>=50")

In [None]:
type(cStateCount50)

In [None]:
cStateCount50.show()

In [None]:
cStateCount50.printSchema()

In [None]:
pwd

In [None]:
cStateCount50.write.save('mydata')

In [None]:
cStateCount50.write.parquet('state_ct1.parquet')

In [None]:
# Coalesce can reduce/merge the number of partitions to a few

In [None]:
cStateCount50.coalesce(2).write.parquet('state_ct1.parquet')

In [None]:
cStateCount50.coalesce(2).write.json('mydata.json')

In [None]:
type(cStateCount50)

In [None]:
spark.read.json('mydata.json').show()

In [None]:
spark.read.parquet('state_ct1.parquet').show()

In [None]:
x=sc.parallelize([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20], numSlices=3)

In [None]:
x.glom().collect()

In [None]:
# Coalesce cannot increase the number of partitions/it will only merge the nearest 
# partitions, it doesn't shuffle the data across the partitions
x.coalesce(2).glom().collect()

In [None]:
x.coalesce(4).glom().collect()

In [None]:
x.repartition(2).glom().collect()

In [None]:
x.repartition(5).glom().collect()

In [None]:
data = [(i,i%2) for i in range(1,1000001)]

In [None]:
rdd1 = spark.sparkContext.parallelize(data,8)

In [None]:
df = rdd1.toDF(["number", "mod_value"])

In [None]:
df.show(5)

In [None]:
df.rdd.getNumPartitions()

In [None]:
repartitioned_df=df.repartition(16,"mod_value")

In [None]:
repartitioned_df.rdd.getNumPartitions()

In [None]:
coalesced_df = df.coalesce(4)

In [None]:
coalesced_df.rdd.getNumPartitions()

In [None]:
repartitioned_df.groupBy("mod_value").count().show()

In [None]:
coalesced_df.groupBy("mod_value").count().show()

In [None]:
spark

In [None]:
# shuffling --> Achieve evenly distribution of the data in all the partitions

In [None]:
data = [(i,) for i in range(1,13)]

In [None]:
df = spark.createDataFrame(data, ['number']).repartition(4)

In [None]:
df.show()

In [None]:
df.withColumn("part_id", spark_partition_id()).show()

In [None]:
from pyspark.sql.functions import spark_partition_id

In [None]:
repart = df.repartition(3)

In [None]:
repart.withColumn("part_id", spark_partition_id()).show()

In [None]:
coalesced = df.coalesce(3)

In [None]:
coalesced.withColumn("part_id", spark_partition_id()).show()

In [None]:
[1,2,3] [4,5,6,7][8,9,10]

In [None]:
[1,2,3,4,5,6,7] [8,9,10]

In [None]:
[1,2,3][4,5,6]

In [None]:
sc.stop()

In [None]:
spark.stop()