In [68]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [69]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

In [44]:
schema = StructType([StructField('ID_A',StringType(),False),
                    StructField('USER_NAME_A',StringType(),False),
                    StructField('DEPARTMENT_CODE',IntegerType(),False),
                    StructField('INIT_APPLICATION',IntegerType(),False),
                    StructField('EMAIL_A',StringType(),False),
                    ])

In [45]:
df_hdfs = spark.read.option("header","true").option("delimiter",",").option("inferSchema","true").csv("hdfs://master.com:8020/user/value/user.txt")

In [46]:
df_hdfs.show()

+-------+-----------+---------------+----------------+------------------+
|   ID_A|USER_NAME_A|DEPARTMENT_CODE|INIT_APPLICATION|           EMAIL_A|
+-------+-----------+---------------+----------------+------------------+
|B-00001|     B00001|            555|            ?278|B00001@biat.com.tn|
|B-00002|     B00002|             35|            ?233|B00002@biat.com.tn|
|B-00003|     B00003|            299|            ?202|B00003@biat.com.tn|
|B-00004|     B00004|             41|            ?265|B00004@biat.com.tn|
|B-00005|     B00005|            605|            ?266|B00005@biat.com.tn|
|B-00006|     B00006|            125|            ?233|B00006@biat.com.tn|
|B-00007|     B00007|            103|            ?202|B00007@biat.com.tn|
+-------+-----------+---------------+----------------+------------------+



In [47]:
df_hdfs.printSchema()

root
 |-- ID_A: string (nullable = true)
 |-- USER_NAME_A: string (nullable = true)
 |-- DEPARTMENT_CODE: integer (nullable = true)
 |-- INIT_APPLICATION: string (nullable = true)
 |-- EMAIL_A: string (nullable = true)



In [48]:
import pyspark.sql.functions as f

In [49]:
df_hdfs.join(df_hdfs.groupBy(df_hdfs.columns).agg((f.count("*")>1).cast("int").alias("Duplicate")),df_hdfs.columns,how="inner").show()

+-------+-----------+---------------+----------------+------------------+---------+
|   ID_A|USER_NAME_A|DEPARTMENT_CODE|INIT_APPLICATION|           EMAIL_A|Duplicate|
+-------+-----------+---------------+----------------+------------------+---------+
|B-00004|     B00004|             41|            ?265|B00004@biat.com.tn|        0|
|B-00003|     B00003|            299|            ?202|B00003@biat.com.tn|        0|
|B-00006|     B00006|            125|            ?233|B00006@biat.com.tn|        0|
|B-00005|     B00005|            605|            ?266|B00005@biat.com.tn|        0|
|B-00001|     B00001|            555|            ?278|B00001@biat.com.tn|        0|
|B-00007|     B00007|            103|            ?202|B00007@biat.com.tn|        0|
|B-00002|     B00002|             35|            ?233|B00002@biat.com.tn|        0|
+-------+-----------+---------------+----------------+------------------+---------+



In [50]:
import time
import datetime
from pyspark.sql.functions import lit,unix_timestamp
datetime

<module 'datetime' from '/home/value/anaconda3/lib/python3.7/datetime.py'>

In [51]:
timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')

In [52]:
newdf = df_hdfs.withColumn('DATDEBVLD',unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp")).withColumn('DATCHG',unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))

In [53]:
newdf.show()

+-------+-----------+---------------+----------------+------------------+-------------------+-------------------+
|   ID_A|USER_NAME_A|DEPARTMENT_CODE|INIT_APPLICATION|           EMAIL_A|          DATDEBVLD|             DATCHG|
+-------+-----------+---------------+----------------+------------------+-------------------+-------------------+
|B-00001|     B00001|            555|            ?278|B00001@biat.com.tn|2021-07-23 09:17:08|2021-07-23 09:17:08|
|B-00002|     B00002|             35|            ?233|B00002@biat.com.tn|2021-07-23 09:17:08|2021-07-23 09:17:08|
|B-00003|     B00003|            299|            ?202|B00003@biat.com.tn|2021-07-23 09:17:08|2021-07-23 09:17:08|
|B-00004|     B00004|             41|            ?265|B00004@biat.com.tn|2021-07-23 09:17:08|2021-07-23 09:17:08|
|B-00005|     B00005|            605|            ?266|B00005@biat.com.tn|2021-07-23 09:17:08|2021-07-23 09:17:08|
|B-00006|     B00006|            125|            ?233|B00006@biat.com.tn|2021-07-23 09:1

In [54]:
findata=newdf.withColumn("DATFINVLD",lit(999))

In [55]:
findata.show()

+-------+-----------+---------------+----------------+------------------+-------------------+-------------------+---------+
|   ID_A|USER_NAME_A|DEPARTMENT_CODE|INIT_APPLICATION|           EMAIL_A|          DATDEBVLD|             DATCHG|DATFINVLD|
+-------+-----------+---------------+----------------+------------------+-------------------+-------------------+---------+
|B-00001|     B00001|            555|            ?278|B00001@biat.com.tn|2021-07-23 09:17:08|2021-07-23 09:17:08|      999|
|B-00002|     B00002|             35|            ?233|B00002@biat.com.tn|2021-07-23 09:17:08|2021-07-23 09:17:08|      999|
|B-00003|     B00003|            299|            ?202|B00003@biat.com.tn|2021-07-23 09:17:08|2021-07-23 09:17:08|      999|
|B-00004|     B00004|             41|            ?265|B00004@biat.com.tn|2021-07-23 09:17:08|2021-07-23 09:17:08|      999|
|B-00005|     B00005|            605|            ?266|B00005@biat.com.tn|2021-07-23 09:17:08|2021-07-23 09:17:08|      999|
|B-00006

In [56]:
findata.coalesce(1).write.csv("hdfs://master.com:8020/user/value/hdfs-user.csv")

In [57]:
df_load = spark.read.option("header","true").csv('hdfs://master.com:8020/user/value/hdfs-user.csv')

In [58]:
df_load.show()

+-------+------+---+----+------------------+------------------------------+------------------------------+---+
|B-00001|B00001|555|?278|B00001@biat.com.tn|2021-07-23T09:17:08.000+01:005|2021-07-23T09:17:08.000+01:006|999|
+-------+------+---+----+------------------+------------------------------+------------------------------+---+
|B-00002|B00002| 35|?233|B00002@biat.com.tn|          2021-07-23T09:17:...|          2021-07-23T09:17:...|999|
|B-00003|B00003|299|?202|B00003@biat.com.tn|          2021-07-23T09:17:...|          2021-07-23T09:17:...|999|
|B-00004|B00004| 41|?265|B00004@biat.com.tn|          2021-07-23T09:17:...|          2021-07-23T09:17:...|999|
|B-00005|B00005|605|?266|B00005@biat.com.tn|          2021-07-23T09:17:...|          2021-07-23T09:17:...|999|
|B-00006|B00006|125|?233|B00006@biat.com.tn|          2021-07-23T09:17:...|          2021-07-23T09:17:...|999|
|B-00007|B00007|103|?202|B00007@biat.com.tn|          2021-07-23T09:17:...|          2021-07-23T09:17:...|999|
+

In [59]:
from pyhive  import  hive
from pyhive import  presto

In [60]:
conn = hive.Connection(host="localhost",port=10000,username="value",database="default")

In [61]:
cnx = conn.cursor()

In [62]:
cnx.execute("CREATE TABLE  master ( ID_A string, USER_NAME_A string, DEPARTMENT_CODE string, INIT_APPLICATION string, EMAIL_A string,DATDEBVLD Timestamp,DATCHG Timestamp, DATFINVLD string)ROW FORMAT DELIMITED FIELDS TERMINATED BY ','")

In [63]:
cnx.execute("load data  INPATH 'hdfs://master.com:8020/user/value/hdfs-user.csv' overwrite into table master")

In [64]:
cnx.execute("select * from master")

In [65]:
newschema = cnx.fetchall()

In [66]:
df1= spark.createDataFrame(newschema,['ID_A','USER_NAME_A','DEPARTMENT_CODE','INIT_APPLICATION','EMAIL_A','DATDEBVLD','DATCHG','DATFINVLD'])


In [67]:
df1.show()

+-------+-----------+---------------+----------------+------------------+-------------------+-------------------+---------+
|   ID_A|USER_NAME_A|DEPARTMENT_CODE|INIT_APPLICATION|           EMAIL_A|          DATDEBVLD|             DATCHG|DATFINVLD|
+-------+-----------+---------------+----------------+------------------+-------------------+-------------------+---------+
|B-00001|     B00001|            555|            ?278|B00001@biat.com.tn|2021-07-23 00:00:00|2021-07-23 00:00:00|      999|
|B-00002|     B00002|             35|            ?233|B00002@biat.com.tn|2021-07-23 00:00:00|2021-07-23 00:00:00|      999|
|B-00003|     B00003|            299|            ?202|B00003@biat.com.tn|2021-07-23 00:00:00|2021-07-23 00:00:00|      999|
|B-00004|     B00004|             41|            ?265|B00004@biat.com.tn|2021-07-23 00:00:00|2021-07-23 00:00:00|      999|
|B-00005|     B00005|            605|            ?266|B00005@biat.com.tn|2021-07-23 00:00:00|2021-07-23 00:00:00|      999|
|B-00006