In [1]:
! sudo apt-get update
! sudo apt-get install -y openjdk-11-jdk

Get:1 http://deb.debian.org/debian buster InRelease [122 kB]
Get:2 http://deb.debian.org/debian-security buster/updates InRelease [34.8 kB]
Get:3 http://deb.debian.org/debian buster-updates InRelease [56.6 kB]
Get:4 http://deb.debian.org/debian buster/main amd64 Packages [7,911 kB]
Get:5 http://deb.debian.org/debian-security buster/updates/main amd64 Packages [338 kB]
Get:6 http://deb.debian.org/debian buster-updates/main amd64 Packages [8,788 B]
Fetched 8,470 kB in 5s (1,835 kB/s)




The following additional packages will be installed:
  at-spi2-core ca-certificates-java dbus dbus-user-session
  dconf-gsettings-backend dconf-service dmsetup fonts-dejavu-extra
  glib-networking glib-networking-common glib-networking-services
  gsettings-desktop-schemas java-common libapparmor1 libargon2-1 libasound2
  libasound2-data libatk-bridge2.0-0 libatk-wrapper-java
  libatk-wrapper-java-jni libatspi2.0-0 libcap2 libcolord2 libcryptsetup12
  libdconf1 libdevmapper1.02.1 libdrm-amdgpu1 libdrm-com

In [2]:
import pyspark

pyspark.__version__

'3.3.0'

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .appName(name="How to Validate data/column in pyspark")\
        .master(master="local[*]")\
        .enableHiveSupport()\
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/08/06 13:31:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## How to handle Bad Records in spark and those types

    - There are three types of modes available while reading and creating dataframe
    - Dealing with bad Records,Verify correctness of data When reading CSV files with specified Schema,
      it is possible that the data in the file does not match the schema. 

In [None]:

df = spark.read.option("mode","PERMISSIVE").csv("/work/datahandling.csv",sep=",",header=True,inferSchema=True)
df.show()

+--------------------+--------+------+------+--------+
|                  id|    name|salary|gender|     loc|
+--------------------+--------+------+------+--------+
|                   1|    amit|  3000|     m|   delhi|
|                   2| chandan|  5000|     m|   delhi|
|                   3|   sumit|  6000|     m|  mumbai|
|                   4|abhishek|  8000|     m|jharknad|
|                   5|    sonu|  4500|     m|banglore|
|                   6|yashvant|  5500|     m|hydrabad|
|7 aryan 6000 m patna|    null|  null|  null|    null|
|8 vikask 5000 m u...|    null|  null|  null|    null|
|9 pankaj 9000 m p...|    null|  null|  null|    null|
|10 babita 2000 f ...|    null|  null|  null|    null|
+--------------------+--------+------+------+--------+



In [None]:
spark.read.text("/work/datahandling.csv").show(truncate=False)

+----------------------------+
|value                       |
+----------------------------+
|id,name,salary,gender,loc   |
|1,amit,3000,m,delhi         |
|2,chandan,5000,m,delhi      |
|3,sumit,6000,m,mumbai       |
|4,abhishek,8000,m,jharknad  |
|5,sonu,4500,m,banglore      |
|6,yashvant,5500,m,hydrabad  |
|7 aryan 6000 m patna        |
|8 vikask 5000 m uttarpradesh|
|9 pankaj 9000 m patna       |
|10 babita 2000 f hydrabad   |
+----------------------------+



- To include this data in a seperate column
- As per the user case, If a user wants us to store a bad record in a seperate column 
   use option mode is PERMISSIVE
- option("columnNameOfCorruptRecord", "_corrupt_record")

In [None]:
from pyspark.sql.types import StructType,StructField,StringType,IntegerType

schema = StructType([
    StructField("id",IntegerType(),True),
    StructField("name",StringType(),True),
    StructField("salary",IntegerType(),True),
    StructField("gender",StringType(),True),
    StructField("loc",StringType(),True),
    StructField("_corrupt_record",StringType())
])


df = spark.read\
    .schema(schema)\
    .option("columnNameOfCorruptRecord","_corrupt_record")\
    .option("mode","PERMISSIVE")\
    .csv("/work/datahandling.csv",sep=",",header=True,inferSchema=True)

#df.unpersist()

corrupted_records = df.filter("_corrupt_record is not null").select("_corrupt_record")

corrupted_records.show(truncate=False)

+----------------------------+
|_corrupt_record             |
+----------------------------+
|7 aryan 6000 m patna        |
|8 vikask 5000 m uttarpradesh|
|9 pankaj 9000 m patna       |
|10 babita 2000 f hydrabad   |
+----------------------------+



In [None]:
# DROPMALFORMED  will ignore all corrupted records

df1 = spark.read.option("mode","DROPMALFORMED").csv("/work/datahandling.csv",sep=",",header=True,inferSchema=True)
df1.show()

+---+--------+------+------+--------+
| id|    name|salary|gender|     loc|
+---+--------+------+------+--------+
|  1|    amit|  3000|     m|   delhi|
|  2| chandan|  5000|     m|   delhi|
|  3|   sumit|  6000|     m|  mumbai|
|  4|abhishek|  8000|     m|jharknad|
|  5|    sonu|  4500|     m|banglore|
|  6|yashvant|  5500|     m|hydrabad|
+---+--------+------+------+--------+



In [None]:
df= df.drop("_corrupt_record").na.drop("all")

df.show()

+---+--------+------+------+--------+
| id|    name|salary|gender|     loc|
+---+--------+------+------+--------+
|  1|    amit|  3000|     m|   delhi|
|  2| chandan|  5000|     m|   delhi|
|  3|   sumit|  6000|     m|  mumbai|
|  4|abhishek|  8000|     m|jharknad|
|  5|    sonu|  4500|     m|banglore|
|  6|yashvant|  5500|     m|hydrabad|
+---+--------+------+------+--------+



#### In the Databricks side we can use option("badRecordsPath","/tmp/badrecords/") and know cause 

df = spark.read.option("badRecordsPath","/tmp/badrecords")\
.csv("/work/datahandling.csv",sep=",",header=True,inferSchema=True)

display(df)


display(spark.read.json("dbfs:/tmp/20220706T160111/bad_records/part-00000-...."))




In [None]:
# FAILFAST  Throws an exception when it meets corruptrd records

# df2 = spark.read.option("mode","FAILFAST").csv("/work/datahandling.csv",sep=",",header=True,inferSchema=True)
# df2.show()

In [None]:
# globles is pedefined inbuild function it will give you list of all objects , objects means 
# methods,variables,functions,DataFrames,everything it will give you k,v pair dataset.
#globals()

In [None]:
# How to get all avilable DataFrames in pyspark

from pyspark.sql import DataFrame

# Using items() will get all items sep by (k,v) and filter using DataFrame Function 

[k for (k,v) in globals().items() if isinstance(v,DataFrame)]

['df', '_15', 'df1', 'df2', 'corrupted_records']

In [None]:
# How to track add source file name in one of the column in dataframe?
from pyspark.sql.functions import input_file_name

df = spark.read.csv("/work/Datasets/BMW_carprices.csv",header=True,inferSchema=True)

df.withColumn("filePath",input_file_name()).show(truncate=False)

+-------+--------+-------------+---------------------------------------+
|Mileage|Age(yrs)|Sell Price($)|filePath                               |
+-------+--------+-------------+---------------------------------------+
|69000  |6       |18000        |file:///work/Datasets/BMW_carprices.csv|
|35000  |3       |34000        |file:///work/Datasets/BMW_carprices.csv|
|57000  |5       |26100        |file:///work/Datasets/BMW_carprices.csv|
|22500  |2       |40000        |file:///work/Datasets/BMW_carprices.csv|
|46000  |4       |31500        |file:///work/Datasets/BMW_carprices.csv|
|59000  |5       |26750        |file:///work/Datasets/BMW_carprices.csv|
|52000  |5       |32000        |file:///work/Datasets/BMW_carprices.csv|
|72000  |6       |19300        |file:///work/Datasets/BMW_carprices.csv|
|91000  |8       |12000        |file:///work/Datasets/BMW_carprices.csv|
|67000  |6       |22000        |file:///work/Datasets/BMW_carprices.csv|
|83000  |7       |18700        |file:///work/Datase

In [None]:
# Get number of rows on each file in a DataFrame?

df = spark.read.csv("/work/Datasets/*.csv",header=True,inferSchema=True)

df.withColumn("file_name",input_file_name()).groupBy("file_name").count().show(truncate=False)

+-----------------------------------------------------+-----+
|file_name                                            |count|
+-----------------------------------------------------+-----+
|file:///work/Datasets/salaries.csv                   |16   |
|file:///work/Datasets/titanic.csv                    |891  |
|file:///work/Datasets/prediction.csv                 |13   |
|file:///work/Datasets/BMW_carprices.csv              |20   |
|file:///work/Datasets/homeprices_categorical.csv     |13   |
|file:///work/Datasets/insurance_data.csv             |27   |
|file:///work/Datasets/user_detail_pipe_delimiter.csv |6    |
|file:///work/Datasets/areas.csv                      |13   |
|file:///work/Datasets/homeprices2.csv                |5    |
|file:///work/Datasets/user_detail_comma_delimiter.csv|6    |
|file:///work/Datasets/homeprices.csv                 |5    |
|file:///work/Datasets/carprices.csv                  |10   |
+-----------------------------------------------------+-----+



## How to add partition_id in DataFrame ?

In [None]:
from pyspark.sql.functions import spark_partition_id

df.withColumn("partition_id",spark_partition_id()).select("partition_id").distinct().show()

+------------+
|partition_id|
+------------+
|           0|
|           1|
+------------+



## How to get row count by partitionid in DataFrame ?

In [None]:
from pyspark.sql.functions import spark_partition_id

df.withColumn("partition_id",spark_partition_id()).groupBy("partition_id").count().show()

+------------+-----+
|partition_id|count|
+------------+-----+
|           0|  980|
|           1|   45|
+------------+-----+



### How to add Sequence generated surrogate key as a column in DataFrame

In [None]:
from pyspark.sql.functions import monotonically_increasing_id

df = spark.read.csv("/work/Datasets/*.csv",header=True,inferSchema=True)

df.withColumn("key",monotonically_increasing_id()).show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+---+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|key|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+---+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|  0|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|  1|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|  2|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|  3|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|  4|
|          6|       0|     3|    Moran, Mr. Jame

In [None]:
from pyspark.sql.functions import md5

df = spark.read.csv("/work/Datasets/*.csv",header=True,inferSchema=True)

df.withColumn("key",md5("PassengerId")).show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|                 key|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|c4ca4238a0b923820...|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|c81e728d9d4c2f636...|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|eccbc87e4b5ce2fe2...|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|a87ff679a2f3e71d9...|
|          5|       0|     3|Allen, Mr. Willia..

### How to get list of databases,tables and columns using spark.catalog

In [None]:
print(spark.catalog.listDatabases())
print(spark.catalog.listTables("default"))
# print(spark.catalog.listColumns("default","tblname"))

[Database(name='default', description='Default Hive database', locationUri='file:/work/spark-warehouse')]
[]


# How partitions are created in spark ?

- Spark's tasks process data as partitions read from disk into memory.

In [None]:
print(spark.conf.get("spark.sql.files.maxPartitionBytes"))

print(134217728/1024/1024)

134217728b
128.0


In [None]:
spark.conf.set("spark.sql.files.maxPartitionBytes","300MB")

rdd = spark.sparkContext.parallelize([1,2,3,4,5])

rdd.getNumPartitions()

2

In [None]:
spark.sparkContext.defaultParallelism

2

In [None]:
# The default shuffle partition will be 200 in spark

spark.conf.get("spark.sql.shuffle.partitions")

'200'

In [None]:
# You can control it by doing these configurations

spark.conf.set("spark.sql.shuffle.partitions",10)

spark.conf.get("spark.sql.shuffle.partitions")

'10'

In [None]:
df = spark.read.option("pathGlobFilter","*.json")\
          .csv("/work/Datasets")\
          .withColumn("filename",input_file_name())

df.select("filename").show(truncate=False)

+-------------------------------------------------+
|filename                                         |
+-------------------------------------------------+
|file:///work/Datasets/orders_sample_datasets.json|
|file:///work/Datasets/orders_sample_datasets.json|
|file:///work/Datasets/orders_sample_datasets.json|
|file:///work/Datasets/orders_sample_datasets.json|
|file:///work/Datasets/orders_sample_datasets.json|
|file:///work/Datasets/orders_sample_datasets.json|
|file:///work/Datasets/orders_sample_datasets.json|
|file:///work/Datasets/orders_sample_datasets.json|
|file:///work/Datasets/orders_sample_datasets.json|
|file:///work/Datasets/orders_sample_datasets.json|
|file:///work/Datasets/orders_sample_datasets.json|
|file:///work/Datasets/orders_sample_datasets.json|
|file:///work/Datasets/orders_sample_datasets.json|
|file:///work/Datasets/orders_sample_datasets.json|
|file:///work/Datasets/orders_sample_datasets.json|
|file:///work/Datasets/orders_sample_datasets.json|
|file:///wor

In [None]:
from pyspark.sql.functions import input_file_name
df = spark.read.option("recursiveFileLookup","true")\
    .csv("/work/*.csv", header=True,inferSchema=True)\
    .withColumn("filename",input_file_name())

df.select("filename").distinct().show(truncate=False)

+-----------------------------+
|filename                     |
+-----------------------------+
|file:///work/datahandling.csv|
+-----------------------------+



#### How to create Database DDL backup from internal Metastore(spark.catalog)

In [None]:
def func_create_ddl_backup(path):
    """ 
    this function we can use for creating delta lake or spark sql database tables backups.
    argument: path - you can pass storage location
    in databricks we have to give /foldername/ because python file can outside dbfs file
    call function : func_create_ddl_backup("/tmp/ddls/")
    """
    dbs = spark.catalog.listDatabases()
    for db in dbs:
        f = open("{}bkp_{}.sql".format(path,db.name), "w")
        tables = spark.catalog.listTables(db.name)
        for tbl in tables:
            DDL =spark.sql("SHOW CREATED TABLE {}.{};".format(db.name,tbl.name))
            f.write(DDL.first()[0])
            f.write("\n")
        f.close()

In [None]:
#func_create_ddl_backup("\tmp\ddls")

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=ae06d956-ba93-4d9c-ad4d-143cd9977dc8' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>