In [1]:
import pyspark
import os
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

conf = (SparkConf()
       .set("spark.driver.bindAddress", "0.0.0.0")\
       .set("spark.submit.deployMode", "client")\
       .set("spark.kubernetes.driver.pod.name", os.getenv("MY_POD_NAME"))\
       .set("spark.kubernetes.authenticate.subdmission.caCertFile", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")\
       .set("spark.kubernetes.authenticate.submission.oauthTokenFile", "/var/run/secrets/kubernetes.io/serviceaccount/token")\
       .set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark-driver")\
       .set("spark.kubernetes.namespace", os.getenv("MY_POD_NAMESPACE"))\
       .set("spark.executor.instances", "1")\
       .set("spark.dynamicAllocation.maxExecutors", "2")\
       .set("spark.dynamicAllocation.enabled", "true")\
       .set("spark.dynamicAllocation.shuffleTracking.enabled", "true")\
       .set("spark.driver.host", "spark-driver")\
       .set("spark.driver.port", "20020")\
       .set("spark.kubernetes.executor.request.cores", "1")\
       .set("spark.kubernetes.executor.limit.cores", "2")\
       .set("spark.executor.memory", "12000m")\
       .set("spark.kubernetes.container.image", "klovercloud/airflow-spark-k8s-driver:3.1.2")\
       .set("spark.kubernetes.container.image.pullPolicy", "IfNotPresent")\
       .set("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")\
       .set("spark.sql.catalog.spark_catalog","org.apache.iceberg.spark.SparkSessionCatalog")\
       .set("spark.sql.catalog.spark_catalog.type","hive")\
       .set("spark.hadoop.hive.metastore.uris","thrift://hive-metastore.analytics-dev:9083")\
       .set("spark.hadoop.hive.metastore.warehouse.dir","s3a://bigdata-dev-cmfcknil/warehouse/")\
       .set("spark.hadoop.fs.s3a.connection.ssl.enabled","true")\
       .set("spark.hadoop.fs.s3a.path.style.access","true")\
       .set("spark.hadoop.fs.s3a.endpoint","https://kcs3.bd-1.wpc.waltonelectronics.com")\
       .set("spark.hadoop.fs.s3a.access.key","30J9IFFY75NVQZVHG4D4")\
       .set("spark.hadoop.fs.s3a.secret.key","WMGk6kqq5erjqZjnqo0QItSQ2zuoNWEpHIphsTgR")\
       .set("spark.hadoop.fs.s3a.attempts.maximum","1")\
       .set("spark.hadoop.fs.s3a.connection.establish.timeout","500")\
       .set("spark.hadoop.datanucleus.autoCreateSchema","true")\
       .set("spark.hadoop.datanucleus.fixedDatastore","false"));

spark = SparkSession.builder \
            .master("k8s://https://kubernetes.default:443") \
            .appName("spark-test-app") \
            .config(conf=conf).enableHiveSupport().getOrCreate()

22/01/31 11:05:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/01/31 11:05:54 WARN ExecutorAllocationManager: Dynamic allocation without a shuffle service is an experimental feature.


In [2]:
pqtDF=spark.read.parquet("s3a://bigdata-dev-cmfcknil/raw/idl/ProductRegistration/tblProductRegistration/dbo/*.parquet")

22/01/31 11:06:08 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [3]:
spark.sql("SHOW DATABASES;").show()

+---------+
|namespace|
+---------+
|  default|
|      ebs|
|      pos|
|   pqspos|
|      reg|
|    tmpdb|
+---------+



In [4]:
spark.sql("SHOW TABLES FROM reg;").show(200,False)

+--------+--------------------------------------+-----------+
|database|tableName                             |isTemporary|
+--------+--------------------------------------+-----------+
|reg     |dbo___apicurrentcredit                |false      |
|reg     |dbo___apifailedstatus                 |false      |
|reg     |dbo___apismslog                       |false      |
|reg     |dbo___blacklisteddealerphonenumbers   |false      |
|reg     |dbo___bumperofferdetails              |false      |
|reg     |dbo___bumperoffersmsmaster            |false      |
|reg     |dbo___computerproductoffer            |false      |
|reg     |dbo___deleted_bumperofferdetails      |false      |
|reg     |dbo___deleted_bumperoffersmsmaster    |false      |
|reg     |dbo___deleted_tblproductregistration  |false      |
|reg     |dbo___deleted_tbltvactivation         |false      |
|reg     |dbo___fixedoffervalues                |false      |
|reg     |dbo___offervaluetype                  |false      |
|reg    

In [5]:
pqtDF.count()

                                                                                

10624633

In [6]:
df=spark.sql("select * from reg.dbo___tblproductregistration")

In [7]:
df.count()

                                                                                

10680138

In [8]:
import pyspark.sql.functions as funcs
pqtDF = pqtDF.withColumn("date_of_registration",funcs.to_date(funcs.col("ProductRegistrationDate")))
df = df.withColumn("date_of_registration",funcs.to_date(funcs.col("ProductRegistrationDate")))

In [9]:
from pyspark.sql.functions import month

pqtDF = pqtDF.withColumn('Month',month(pqtDF.date_of_registration))
df = df.withColumn('Month',month(df.date_of_registration))




In [10]:
from pyspark.sql.functions import year

pqtDF = pqtDF.withColumn('Year',year(pqtDF.date_of_registration))
df = df.withColumn('Year',year(df.date_of_registration))

In [11]:
yearwiserowcount = pqtDF.cache()

22/01/30 10:46:50 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [15]:
yearwiserowcount.createOrReplaceTempView("yearcount")
df.createOrReplaceTempView("countyear")

In [16]:
year2017count = spark.sql("SELECT Year,count(*) FROM yearcount where Year=2017 group by Year")
year2017count.show()

+----+--------+
|Year|count(1)|
+----+--------+
|2017|  518623|
+----+--------+



In [17]:
year2017count = year2017count.withColumnRenamed("count(1)","totalrowcount2017")

In [18]:
df2017count = spark.sql("SELECT Year,count(*) FROM countyear where Year=2017 group by Year")
df2017count.show()

                                                                                

+----+--------+
|Year|count(1)|
+----+--------+
|2017|  518623|
+----+--------+



In [24]:
df2017count = df2017count.withColumnRenamed("count(1)","icebergrowcount2017")
df2017count = df2017count.withColumnRenamed("Year","bochor")

In [25]:
rowcheck17 = year2017count.join(df2017count,year2017count.totalrowcount2017 == df2017count.icebergrowcount2017,"inner")

In [26]:
rowcheck17.show()

                                                                                

+----+-----------------+------+-------------------+
|Year|totalrowcount2017|bochor|icebergrowcount2017|
+----+-----------------+------+-------------------+
|2017|           518623|  2017|             518623|
+----+-----------------+------+-------------------+



In [27]:
rowcheck17 = rowcheck17.drop("bochor")

In [28]:
rowcheck17.show()

                                                                                

+----+-----------------+-------------------+
|Year|totalrowcount2017|icebergrowcount2017|
+----+-----------------+-------------------+
|2017|           518623|             518623|
+----+-----------------+-------------------+



In [29]:
year2018count = spark.sql("SELECT Year,count(*) FROM yearcount where Year=2018 group by Year")


In [30]:
year2018count = year2018count.withColumnRenamed("count(1)","totalrowcount2018")

In [31]:
df2018count = spark.sql("SELECT Year,count(*) FROM countyear where Year=2018 group by Year")


In [33]:
df2018count = df2018count.withColumnRenamed("count(1)","icebergrowcount2018")
df2018count = df2018count.withColumnRenamed("Year","bochor")

In [35]:
rowcheck18 = year2018count.join(df2018count,year2018count.totalrowcount2018 == df2018count.icebergrowcount2018,"inner")

In [36]:
rowcheck18 = rowcheck18.drop("bochor")

In [37]:
rowcheck18.show()

                                                                                

+----+-----------------+-------------------+
|Year|totalrowcount2018|icebergrowcount2018|
+----+-----------------+-------------------+
|2018|          1955039|            1955039|
+----+-----------------+-------------------+



In [39]:
year2019count = spark.sql("SELECT Year,count(*) FROM yearcount where Year=2019 group by Year")

In [40]:
year2019count = year2019count.withColumnRenamed("count(1)","totalrowcount2019")

In [41]:
df2019count = spark.sql("SELECT Year,count(*) FROM countyear where Year=2019 group by Year")

In [42]:
df2019count = df2019count.withColumnRenamed("count(1)","icebergrowcount2019")
df2019count = df2019count.withColumnRenamed("Year","bochor")

In [44]:
rowcheck19 = year2019count.join(df2019count,year2019count.totalrowcount2019 == df2019count.icebergrowcount2019,"inner")

In [45]:
rowcheck19 = rowcheck19.drop("bochor")

In [46]:
rowcheck19.show()

                                                                                

+----+-----------------+-------------------+
|Year|totalrowcount2019|icebergrowcount2019|
+----+-----------------+-------------------+
|2019|          2716880|            2716880|
+----+-----------------+-------------------+



In [47]:
year2020count = spark.sql("SELECT Year,count(*) FROM yearcount where Year=2020 group by Year")


In [48]:
year2020count = year2020count.withColumnRenamed("count(1)","totalrowcount2020")

In [49]:
df2020count = spark.sql("SELECT Year,count(*) FROM countyear where Year=2020 group by Year")

In [50]:
df2020count = df2020count.withColumnRenamed("count(1)","icebergrowcount2020")
df2020count = df2020count.withColumnRenamed("Year","bochor")

In [52]:
rowcheck20 = year2020count.join(df2020count,year2020count.totalrowcount2020 == df2020count.icebergrowcount2020,"inner")

In [53]:
rowcheck20 = rowcheck20.drop("bochor")

In [54]:
rowcheck20.show()

                                                                                

+----+-----------------+-------------------+
|Year|totalrowcount2020|icebergrowcount2020|
+----+-----------------+-------------------+
|2020|          2275356|            2275356|
+----+-----------------+-------------------+



In [55]:
year2021count = spark.sql("SELECT Year,count(*) FROM yearcount where Year=2021 group by Year")


In [56]:
year2021count = year2021count.withColumnRenamed("count(1)","totalrowcount2021")

In [57]:
df2021count = spark.sql("SELECT Year,count(*) FROM countyear where Year=2021 group by Year")

In [58]:
df2021count = df2021count.withColumnRenamed("count(1)","icebergrowcount2021")
df2021count = df2021count.withColumnRenamed("Year","bochor")

In [59]:
rowcheck21 = year2021count.join(df2021count,year2021count.totalrowcount2021 == df2021count.icebergrowcount2021,"inner")

In [60]:
rowcheck21 = rowcheck21.drop("bochor")

In [61]:
row = rowcheck17.union(rowcheck18)

In [62]:
row1 = row.union(rowcheck19)

In [63]:
row2 = row1.union(rowcheck20)

In [64]:
row3 = row2.union(rowcheck21,['Year'],how='full')

In [65]:
row3.show()

                                                                                

+----+-----------------+-------------------+-----------------+-------------------+-----------------+-------------------+-----------------+-------------------+-----------------+-------------------+
|Year|totalrowcount2017|icebergrowcount2017|totalrowcount2018|icebergrowcount2018|totalrowcount2019|icebergrowcount2019|totalrowcount2020|icebergrowcount2020|totalrowcount2021|icebergrowcount2021|
+----+-----------------+-------------------+-----------------+-------------------+-----------------+-------------------+-----------------+-------------------+-----------------+-------------------+
|2018|             null|               null|          1955039|            1955039|             null|               null|             null|               null|             null|               null|
|2019|             null|               null|             null|               null|          2716880|            2716880|             null|               null|             null|               null|
|2020|         

In [66]:
row3 = row3.dropDuplicates(["Year"])

In [None]:
row3.show()

[Stage 334:(18 + 2) / 144][Stage 335:> (0 + 0) / 14][Stage 338:>(0 + 0) / 144]]]

In [15]:
pqtDF2021 = pqtDF.filter(pqtDF.Year == 2021)
df2021 = df.filter(df.Year == 2021)

In [16]:

pqtDFjune2021 = pqtDF2021.filter(pqtDF2021.Month == 6)
dfjune2021 = df2021.filter(df.Month == 6)

In [17]:
pqtDFjune2021.count()

                                                                                

323974

In [None]:
dfjune2021.count()

In [12]:
pqtDFfirst6months = pqtDF.filter((pqtDF.Month == 1) | (pqtDF.Month == 2) | (pqtDF.Month == 3) | (pqtDF.Month == 4) | (pqtDF.Month == 5) | (pqtDF.Month == 6))

In [13]:
pqtDFfirst6months.count()

                                                                                

4619775

In [8]:
from pyspark.sql.functions import year

pqtDF = pqtDF.withColumn('Year',year(pqtDF.date_of_registration))

In [9]:
pqtDF2017 = pqtDF.filter(pqtDF.Year == 2017)

In [10]:
pqtDF2017.createOrReplaceTempView("2017_table")

In [11]:
pq17 = spark.sql("SELECT Year,count(*) from 2017_table group by Year")

In [12]:
pq17.show()

                                                                                

+----+--------+
|Year|count(1)|
+----+--------+
|2017|  518623|
+----+--------+



In [13]:
pq17 = pq17.withColumnRenamed("count(1)","totalrows")

In [14]:
pqtDF2018 = pqtDF.filter(pqtDF.Year == 2018)

In [15]:
pqtDF2018.createOrReplaceTempView("2018_table")

In [16]:
pq18 = spark.sql("SELECT Year,count(*) from 2018_table group by Year")

In [17]:
pq18.show()

                                                                                

+----+--------+
|Year|count(1)|
+----+--------+
|2018| 1955039|
+----+--------+



In [18]:
pq18 = pq18.withColumnRenamed("count(1)","totalrows")

In [19]:
pqtDF2019 = pqtDF.filter(pqtDF.Year == 2019)

In [20]:
pqtDF2019.createOrReplaceTempView("2019_table")

In [21]:
pq19 = spark.sql("SELECT Year,count(*) from 2019_table group by Year")

In [22]:
pq19.show()

                                                                                

+----+--------+
|Year|count(1)|
+----+--------+
|2019| 2716880|
+----+--------+



In [23]:
pq19 = pq19.withColumnRenamed("count(1)","totalrows")

In [24]:
pqtDF2020 = pqtDF.filter(pqtDF.Year == 2020)

In [25]:
pqtDF2020.createOrReplaceTempView("2020_table")

In [26]:
pq20 = spark.sql("SELECT Year,count(*) from 2020_table group by Year")

In [27]:
pq20.show()

                                                                                

+----+--------+
|Year|count(1)|
+----+--------+
|2020| 2275356|
+----+--------+



In [28]:
pq20 = pq20.withColumnRenamed("count(1)","totalrows")

In [29]:
pqtDF2021 = pqtDF.filter(pqtDF.Year == 2021)

In [30]:
pqtDF2021.createOrReplaceTempView("2021_table")

In [31]:
pq21 = spark.sql("SELECT Year,count(*) from 2021_table group by Year")

In [32]:
pq21.show()

                                                                                

+----+--------+
|Year|count(1)|
+----+--------+
|2021| 3055323|
+----+--------+



In [45]:
pq21 = pq21.withColumnRenamed("count(1)","totalrows")

In [33]:
result = pq17.union(pq18)
result1 = result.union(pq19)
result2 = result1.union(pq20)
result3 = result2.union(pq21)

In [34]:
result3.show()

22/01/29 08:51:29 WARN WatchConnectionManager: Exec Failure        (1 + 1) / 14]
java.io.EOFException
	at okio.RealBufferedSource.require(RealBufferedSource.java:61)
	at okio.RealBufferedSource.readByte(RealBufferedSource.java:74)
	at okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:117)
	at okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:101)
	at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
	at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
	at okhttp3.RealCall$AsyncCall.execute(RealCall.java:203)
	at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

+----+---------+
|Year|totalrows|
+----+---------+
|2017|   518623|
|2018|  1955039|
|2019|  2716880|
|2020|  2275356|
|2021|  3055323|
+----+---------+



22/01/29 09:01:30 WARN WatchConnectionManager: Exec Failure                     
java.io.EOFException
	at okio.RealBufferedSource.require(RealBufferedSource.java:61)
	at okio.RealBufferedSource.readByte(RealBufferedSource.java:74)
	at okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:117)
	at okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:101)
	at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
	at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
	at okhttp3.RealCall$AsyncCall.execute(RealCall.java:203)
	at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
22/01/29 09:01:31 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed (this is ex