In [36]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()


In [37]:
df = spark.read.json('C:/Users/vrjav/Downloads/pyspark/learning1.ndjson')

In [38]:
df.cache()

DataFrame[age: bigint, name: string]

In [39]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [40]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [41]:
df.select('name').show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [42]:
df.createOrReplaceTempView("details")
spark.sql('select * from details where name="Michael"').show()

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
+----+-------+



# Generic Load and Save Functions

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [2]:
path = "C:/spark/spark-3.5.1-bin-hadoop3/examples/src/main/resources/people.json"

df = spark.read.load(path, format='json')

In [3]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [4]:
import os

path = "C:/spark/spark-3.5.1-bin-hadoop3/examples/src/main/resources"

df1 = spark.read.load(os.path.join(path, "people.json"), format = 'json')
df1.show()

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [5]:
df2 = spark.read.load(os.path.join(path, "users.parquet"))
df2.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          NULL|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



### Running SQL on files directly

In [18]:
df = spark.sql("select * from parquet.`C:/spark/spark-3.5.1-bin-hadoop3/examples/src/main/resources/users.parquet`")

In [19]:
df.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          NULL|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



In [6]:
df = spark.read.load(os.path.join(path,"users.orc"), format='orc')

In [7]:
df_orc = df.write \
    .format("orc") \
    .mode("overwrite") \
    .save("users_without_bloom.orc")


In [8]:
df_bloom = df.write \
    .format("orc") \
    .option("orc.bloom.filter.columns", "favorite_color") \
    .mode("overwrite") \
    .save("users_with_bloom.orc")


In [9]:
df_orc = spark.read.format("orc").load("users_without_bloom.orc")
df_bloom = spark.read.format("orc").load("users_with_bloom.orc")


In [10]:
from pyspark.sql.functions import col
import time

# Time query function
def time_query(df):
    start = time.time()
    df.filter(col("favorite_color") == "blue").count()
    return time.time() - start

# Without Bloom filter
time_no_bloom = time_query(df_orc)

# With Bloom filter
time_with_bloom = time_query(df_bloom)

# Print the results
print(f"Time without Bloom filter: {time_no_bloom:.3f} seconds")
print(f"Time with Bloom filter: {time_with_bloom:.3f} seconds")


Time without Bloom filter: 1.999 seconds
Time with Bloom filter: 0.462 seconds


#### Generic Files Source Options

In [33]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

##### Data Source Option

In [34]:
df = spark.read.option("ignoreCorruptFiles", "true").parquet("C:/spark/spark-3.5.1-bin-hadoop3/examples/src/main/resources/dir1", "C:/spark/spark-3.5.1-bin-hadoop3/examples/src/main/resources/dir1/dir2")
df.show()

+-------------+
|         file|
+-------------+
|file1.parquet|
|file2.parquet|
+-------------+



##### Configuration

In [35]:
spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
df0 = spark.read.parquet("C:/spark/spark-3.5.1-bin-hadoop3/examples/src/main/resources/dir1", "C:/spark/spark-3.5.1-bin-hadoop3/examples/src/main/resources/dir1/dir2")
df0.show()

+-------------+
|         file|
+-------------+
|file1.parquet|
|file2.parquet|
+-------------+



##### Recursive File Lookup
In here we don't need to mention subdirectories

In [36]:
df = spark.read.format("parquet").option("recursiveFileLookup", 'true').load("C:/spark/spark-3.5.1-bin-hadoop3/examples/src/main/resources/dir1")
df.show()

+-------------+
|         file|
+-------------+
|file1.parquet|
|file2.parquet|
+-------------+



#### Data Source

##### Parquet


In [2]:
from pyspark.sql import  SparkSession
spark = SparkSession.builder.getOrCreate()

In [8]:
df_pq = spark.read.json("C:/Users/vrjav/Downloads/pyspark/learning1.ndjson")

In [9]:
df_pq.show()

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [10]:
df_pq.write.parquet('people.parquet')

In [11]:
pqFile = spark.read.parquet('people.parquet')

In [15]:
pqFile.createOrReplaceTempView('example')
ex_pq = spark.sql('select * from example where age is Null')
ex_pq.show()
ex_pq.printSchema()

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
+----+-------+

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [24]:
from pyspark.sql import Row
sc = spark.sparkContext
sq_pq = spark.createDataFrame(sc.parallelize(range(1,6)).map(lambda i: Row(single=i, double=i**2)))
sq_pq.write.parquet('data/test_table/key=1')

In [25]:
cb_pq = spark.createDataFrame(sc.parallelize(range(6,11)).map(lambda i: Row(single=i, cube=i**3)))
cb_pq.write.parquet('data/test_table/key=2')

In [28]:
mergedf = spark.read.option('mergeSchema', 'true').parquet('data/test_table')
mergedf.show()
mergedf.printSchema()

+------+------+----+---+
|single|double|cube|key|
+------+------+----+---+
|     1|     1|NULL|  1|
|     2|     4|NULL|  1|
|     3|     9|NULL|  1|
|     4|    16|NULL|  1|
|     5|    25|NULL|  1|
|     6|  NULL| 216|  2|
|     7|  NULL| 343|  2|
|     9|  NULL| 729|  2|
|     8|  NULL| 512|  2|
|    10|  NULL|1000|  2|
+------+------+----+---+

root
 |-- single: long (nullable = true)
 |-- double: long (nullable = true)
 |-- cube: long (nullable = true)
 |-- key: integer (nullable = true)



##### ORC

In [5]:
from pyspark.sql import Row
sc = spark.sparkContext

orc_df = spark.createDataFrame(sc.parallelize(range(1,10)).map(lambda i: Row(single=i, power4=i**4)))
orc_df.write.orc('data/test_table/key=3', mode='overwrite')

In [6]:
df_orc = spark.read.orc('data/test_table/key=3')
df_orc.show()

+------+------+
|single|power4|
+------+------+
|     4|   256|
|     5|   625|
|     6|  1296|
|     7|  2401|
|     8|  4096|
|     9|  6561|
|     3|    81|
|     2|    16|
|     1|     1|
+------+------+



#### JSON

In [17]:
json_strings = ['{"name":"Vidyaranya", "address": {"city":"Columbus","state":"Ohio"}}']


json_df = sc.parallelize(json_strings)
people_df = spark.read.json(json_df)
people_df.select('name', 'address').show()

+----------+----------------+
|      name|         address|
+----------+----------------+
|Vidyaranya|{Columbus, Ohio}|
+----------+----------------+



In [20]:
from pyspark.sql.functions import col, concat_ws
p1_df = people_df.withColumn('address', concat_ws(", ", col('address.city'), col('address.state')))

p1_df.show()

p1_df = people_df.withColumn('address', concat_ws(", ", col('address.city'), col('address.state'))).select('name', 'address')
p1_df.show()


+--------------+----------+
|       address|      name|
+--------------+----------+
|Columbus, Ohio|Vidyaranya|
+--------------+----------+

+----------+--------------+
|      name|       address|
+----------+--------------+
|Vidyaranya|Columbus, Ohio|
+----------+--------------+



In [28]:
p1_df.createOrReplaceTempView('people')
s1 = spark.sql('select * from people where name != "Vidyaranya"')
s1.show()

+----+-------+
|name|address|
+----+-------+
+----+-------+



#### CSV

In [44]:
#withoutout any delimiter or option
csv = spark.read.csv('C:/spark/spark-3.5.1-bin-hadoop3/examples/src/main/resources/people.csv')
csv.show()

#with delimiter in option
csv1 = spark.read.option("delimiter", ";").csv('C:/spark/spark-3.5.1-bin-hadoop3/examples/src/main/resources/people.csv')
csv1.show()

#with multiple option 
csv2 = spark.read.option("delimiter",";").option('header','True').csv('C:/spark/spark-3.5.1-bin-hadoop3/examples/src/main/resources/people.csv')
csv2.show()

#options instead of multiple option
csv3 = spark.read.options(delimiter=";", header = True).csv('C:/spark/spark-3.5.1-bin-hadoop3/examples/src/main/resources/people.csv')
csv3.show()


+------------------+
|               _c0|
+------------------+
|      name;age;job|
|Jorge;30;Developer|
|  Bob;32;Developer|
+------------------+

+-----+---+---------+
|  _c0|_c1|      _c2|
+-----+---+---------+
| name|age|      job|
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+

+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+

+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+



In [49]:
csv3.createOrReplaceTempView("employee")
csv_sql = spark.sql('select * from employee e1 full join employee e2 ')
csv_sql.show()

+-----+---+---------+-----+---+---------+
| name|age|      job| name|age|      job|
+-----+---+---------+-----+---+---------+
|Jorge| 30|Developer|Jorge| 30|Developer|
|Jorge| 30|Developer|  Bob| 32|Developer|
|  Bob| 32|Developer|Jorge| 30|Developer|
|  Bob| 32|Developer|  Bob| 32|Developer|
+-----+---+---------+-----+---+---------+



#### Text File

In [56]:
tf = spark.read.text('C:/spark/spark-3.5.1-bin-hadoop3/examples/src/main/resources/people.txt')
tf.show()

tf1 = spark.read.option("linesep",',').text('C:/spark/spark-3.5.1-bin-hadoop3/examples/src/main/resources/people.txt')
tf1.show()

tf2 = spark.read.text('C:/spark/spark-3.5.1-bin-hadoop3/examples/src/main/resources/people.txt', lineSep=',')
tf2.show()

tf3 = spark.read.text('C:/spark/spark-3.5.1-bin-hadoop3/examples/src/main/resources/people.txt',  wholetext=True)
tf3.show()

+-----------+
|      value|
+-----------+
|Michael, 29|
|   Andy, 30|
| Justin, 19|
+-----------+

+-----------+
|      value|
+-----------+
|    Michael|
|   29\nAndy|
| 30\nJustin|
|       19\n|
+-----------+

+-----------+
|      value|
+-----------+
|    Michael|
|   29\nAndy|
| 30\nJustin|
|       19\n|
+-----------+

+--------------------+
|               value|
+--------------------+
|Michael, 29\nAndy...|
+--------------------+



#### Hive

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()


sc = spark.sparkContext
from pyspark.sql.functions import col, split, regexp_replace
kv1 = spark.read.options(delimiter="").csv("C:/spark/spark-3.5.1-bin-hadoop3/examples/src/main/resources/kv1.txt")
kv1.show(3)

kv1_updated = kv1.withColumn("_c1",  regexp_replace(col('_c1'), 'val_', ""))
kv1_updated.show(3) 



kv1_r1 = kv1.withColumnRenamed('_c0', 'Key').withColumnRenamed('_c1', 'val')
kv1_r1.head(5)

+---+-------+
|_c0|    _c1|
+---+-------+
|238|val_238|
| 86| val_86|
|311|val_311|
+---+-------+
only showing top 3 rows

+---+---+
|_c0|_c1|
+---+---+
|238|238|
| 86| 86|
|311|311|
+---+---+
only showing top 3 rows



[Row(Key='238', val='val_238'),
 Row(Key='86', val='val_86'),
 Row(Key='311', val='val_311'),
 Row(Key='27', val='val_27'),
 Row(Key='165', val='val_165')]

In [2]:
kv1_r1 = kv1_r1.withColumn('key', kv1_r1['key'].cast('int'))
kv1_r1.createOrReplaceTempView("values")
spark.sql("select * from values where key<100 order by cast(key as INT) ").show()

+---+------+
|key|   val|
+---+------+
|  0| val_0|
|  0| val_0|
|  0| val_0|
|  2| val_2|
|  4| val_4|
|  5| val_5|
|  5| val_5|
|  5| val_5|
|  8| val_8|
|  9| val_9|
| 10|val_10|
| 11|val_11|
| 12|val_12|
| 12|val_12|
| 15|val_15|
| 15|val_15|
| 17|val_17|
| 18|val_18|
| 18|val_18|
| 19|val_19|
+---+------+
only showing top 20 rows



In [3]:
kv1_r1.printSchema()

root
 |-- key: integer (nullable = true)
 |-- val: string (nullable = true)



In [None]:
from pyspark.sql.functions import split, col, regexp_replace
kv1_u = kv1_r1.withColumn('value_', regexp_replace(col('val'), "val_", "")).select("key", "val", "value_")

kv1_u.createOrReplaceTempView('example')
kv1_sql = spark.sql('select * from example where key = value_')
kv1_sql.show(10)

kv1_sql1 = spark.sql("select count(*) as `total number of rows` from example")
kv1_sql1.show()

+---+-------+------+
|key|    val|value_|
+---+-------+------+
|238|val_238|   238|
| 86| val_86|    86|
|311|val_311|   311|
| 27| val_27|    27|
|165|val_165|   165|
|409|val_409|   409|
|255|val_255|   255|
|278|val_278|   278|
| 98| val_98|    98|
|484|val_484|   484|
+---+-------+------+
only showing top 10 rows

+--------------------+
|total number of rows|
+--------------------+
|                 500|
+--------------------+



In [19]:
from pyspark.sql import Row
Record = Row('key', 'value')

df = spark.createDataFrame([Record(i, 'val_'+str(i)) for i in range(1,101)])
df.show()

+---+------+
|key| value|
+---+------+
|  1| val_1|
|  2| val_2|
|  3| val_3|
|  4| val_4|
|  5| val_5|
|  6| val_6|
|  7| val_7|
|  8| val_8|
|  9| val_9|
| 10|val_10|
| 11|val_11|
| 12|val_12|
| 13|val_13|
| 14|val_14|
| 15|val_15|
| 16|val_16|
| 17|val_17|
| 18|val_18|
| 19|val_19|
| 20|val_20|
+---+------+
only showing top 20 rows

