In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("JoinExamples").getOrCreate()

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True)
])

schema2 = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True),
    StructField("local", StringType(), True)
])

data1 = [(1, "Alice", 25, "New York"), (None, "Bob", 30, "London"), (3, "Charlie", 22, "Paris"), (4, "David", 28, "Tokyo")]
df1 = spark.createDataFrame(data1, schema=schema)

data2 = [(1, "Alice", 24, "New York","monday"), (5, "Eve", 24, "Sydney",None), (1, "Bob", 35, "London","Tuesday"), (None, "Frank", 35, "Berlin",None),(None, "kil", 35, "Berlin", 'sunday')]  # Note: Some overlap in 'id' and 'name'
df2 = spark.createDataFrame(data2, schema=schema2)

df1.show()
df2.show()

25/03/21 17:23:08 WARN Utils: Your hostname, codespaces-56288b resolves to a loopback address: 127.0.0.1; using 10.0.3.12 instead (on interface eth0)
25/03/21 17:23:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/21 17:23:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/21 17:23:33 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

+----+-------+---+--------+
|  id|   name|age|    city|
+----+-------+---+--------+
|   1|  Alice| 25|New York|
|NULL|    Bob| 30|  London|
|   3|Charlie| 22|   Paris|
|   4|  David| 28|   Tokyo|
+----+-------+---+--------+

+----+-----+---+--------+-------+
|  id| name|age|    city|  local|
+----+-----+---+--------+-------+
|   1|Alice| 24|New York| monday|
|   5|  Eve| 24|  Sydney|   NULL|
|   1|  Bob| 35|  London|Tuesday|
|NULL|Frank| 35|  Berlin|   NULL|
|NULL|  kil| 35|  Berlin| sunday|
+----+-----+---+--------+-------+



In [None]:
customer_filter = df1.filter((col('city') == 'New York') & (col('age') <= 30 ))
customer_filter.show()

inner_join = df1.join(df2, df1.id.eqNullSafe(df2.id), 'inner')
inner_join.show()

data = [
    ('raja', ['apple', 'orange']),
    ('shahid', ['banana','kiwi']),
    ('panneer', None)
]

df3 = spark.createDataFrame(data=data, schema=['name','fruits'])
df3.show()

df4 = df3.select(col('name'), explode_outer(col('fruits')))
df4.show()


df5 = df1.withColumn("country", when(col("city") == "Berlin", "Russia")
                     .when(col("city") == "New York", "USA")
                     .when(col("city") == "London","UK")
                     .when(col("city") == "Paris", "France")
                     .when(col("city") == "Tokyo", "Japan")
                     .otherwise("Third World Country"))

df5.show()

df6 = df5.withColumnRenamed("name", "cust_name")
df6.show()

df6 = df6.drop(col("cust_name"))
df6.show()


# Handling Bad Records in Pyspark 

+---+-----+---+--------+
| id| name|age|    city|
+---+-----+---+--------+
|  1|Alice| 25|New York|
+---+-----+---+--------+



                                                                                

+----+-----+---+--------+----+-----+---+--------+-------+
|  id| name|age|    city|  id| name|age|    city|  local|
+----+-----+---+--------+----+-----+---+--------+-------+
|NULL|  Bob| 30|  London|NULL|Frank| 35|  Berlin|   NULL|
|NULL|  Bob| 30|  London|NULL|  kil| 35|  Berlin| sunday|
|   1|Alice| 25|New York|   1|Alice| 24|New York| monday|
|   1|Alice| 25|New York|   1|  Bob| 35|  London|Tuesday|
+----+-----+---+--------+----+-----+---+--------+-------+

+-------+---------------+
|   name|         fruits|
+-------+---------------+
|   raja|[apple, orange]|
| shahid| [banana, kiwi]|
|panneer|           NULL|
+-------+---------------+

+-------+------+
|   name|   col|
+-------+------+
|   raja| apple|
|   raja|orange|
| shahid|banana|
| shahid|  kiwi|
|panneer|  NULL|
+-------+------+

+----+-------+---+--------+-------+
|  id|   name|age|    city|country|
+----+-------+---+--------+-------+
|   1|  Alice| 25|New York|    USA|
|NULL|    Bob| 30|  London|     UK|
|   3|Charlie| 22|

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/workspaces/AzureDataBricks/Filestore.

In [6]:
# performing re-partiton and coalesce 

df1.show()

df2 = df1.repartition(2)
df2.show()

sc.defaultParallelism

+----+-------+---+--------+
|  id|   name|age|    city|
+----+-------+---+--------+
|   1|  Alice| 25|New York|
|NULL|    Bob| 30|  London|
|   3|Charlie| 22|   Paris|
|   4|  David| 28|   Tokyo|
+----+-------+---+--------+

+----+-------+---+--------+
|  id|   name|age|    city|
+----+-------+---+--------+
|   1|  Alice| 25|New York|
|   4|  David| 28|   Tokyo|
|NULL|    Bob| 30|  London|
|   3|Charlie| 22|   Paris|
+----+-------+---+--------+



NameError: name 'sc' is not defined

In [3]:
# list of employee data
data = [["1001", "gaurav", "hyderabad","42000"],
["1002", "vijay", "hyderabad","45565"],
["1003", "akanksha","hyderabad", "52000"],
["1004", "niharika", "hyderabad","35000"]]
# specify column names
columns = ['id','name','location','salry']
# creating a dataframe from the lists of data
df_full = spark.createDataFrame(data, columns)
# list of employee data
data = [ ["1003", "akanksha","delhi", "65000"],
["1004", "niharika", "bihar","10000"],
["1005", "murali","vijaywada", "80000"],
["1002", "vijay", "hyderabad","45565"]
]
# specify column names
columns = ['id','name','location','salry']
# creating a dataframe from the lists of data
df_daily_update = spark.createDataFrame(data, columns)
print("Full data...")
df_full.show()
print("daily data...")
df_daily_update.show()

Full data...


+----+--------+---------+-----+
|  id|    name| location|salry|
+----+--------+---------+-----+
|1001|  gaurav|hyderabad|42000|
|1002|   vijay|hyderabad|45565|
|1003|akanksha|hyderabad|52000|
|1004|niharika|hyderabad|35000|
+----+--------+---------+-----+

daily data...
+----+--------+---------+-----+
|  id|    name| location|salry|
+----+--------+---------+-----+
|1003|akanksha|    delhi|65000|
|1004|niharika|    bihar|10000|
|1005|  murali|vijaywada|80000|
|1002|   vijay|hyderabad|45565|
+----+--------+---------+-----+

