## Importing basic libraries to use pyspark in jupyter notebook

In [1]:

from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import col

## Initializing spark session to connect to hdfs as well as hive

In [2]:
spark = SparkSession.builder \
    .enableHiveSupport() \
    .appName("HDFS to Jupyter") \
    .getOrCreate()

2024-09-15 14:27:39,707 WARN util.Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface enp0s3)
2024-09-15 14:27:39,709 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
2024-09-15 14:27:40,119 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Importing the csv file located at hdfs's dfs into notebook for data cleaning and exploration

In [3]:
df1 = spark.read.csv("hdfs://localhost:9000/user/hadoop/mtb_data/events.1724437428393", header=True, inferSchema=True)

                                                                                

In [4]:
df1.head

<bound method DataFrame.head of DataFrame[Event: string, Program stage: string, Test Date: string, Stored by: string, Created by: string, Last updated by: string, Last updated on: string, Scheduled date: string, Enrollment date: string, Incident date: string, Tracked entity instance: string, Program instance: string, Geometry: string, Longitude: double, Latitude: double, Organisation unit name: string, Organisation unit name hierarchy: string, Organisation unit code: string, Program status: string, Event status: string, Organisation unit: string, Test Requested: string, Ward Number: int, District: string, Sex: string, GeneXpert Test Result: string, Municipality: string, Patient ID: int, Age: string]>

## Dropping unnecessary columns that don't hold value to this project

In [5]:
df1 = df1.drop('Program stage','Test Date',"Event", "Stored by", "Created by", 
 "Last updated on", "Scheduled date", "Incident date","Tracked entity instance", 
"Program instance", "Geometry", "Longitude", "Latitude", 
"Program status", "Event status","Test Requested")

## Renaming all the columns into standard format 

In [6]:
new_column_names = {
    'Last updated by': 'last_updated_by',
    'Enrollment date': 'enrollment_date',
    'Organisation unit name': 'organisation_unit_name',
    'Organisation unit name hierarchy': 'organisation_unit_name_hierarchy',
    'Organisation unit code': 'organisation_unit_code',
    'Organisation unit': 'organisation_unit',
    'Ward Number': 'ward_number',
    'District': 'district',
    'Sex': 'sex',
    'GeneXpert Test Result': 'genexpert_test_result',
    'Municipality': 'municipality',
    'Patient ID': 'patient_id',
    'Age': 'age'
}

# Rename columns using the dictionary
for old_name, new_name in new_column_names.items():
    df1 = df1.withColumnRenamed(old_name, new_name)


## Converting pyspark dataframe into pandas dataframe for further cleaning and exploration

In [7]:
df = df1.toPandas()


                                                                                

In [8]:
df

Unnamed: 0,last_updated_by,enrollment_date,organisation_unit_name,organisation_unit_name_hierarchy,organisation_unit_code,organisation_unit,ward_number,district,sex,genexpert_test_result,municipality,patient_id,age
0,"Lumbini Medical College, Palpa_LAB (lmcp.lab)",2020-11-23 00:00:00.0,LUMBINI MEDICAL COLLEGE_PALPA,Nepal / 5 Lumbini Province / 506 PALPA / 50605...,14076,R0g8PTLzqmq,1.0,PALPA,Male,Rif Resistance DETECTED,50602 Purbakhola Rural Municipality,677367,45 to 54
1,"Prithivi Chandra Hospital, Nawalparasi_LAB (pc...",2020-12-01 00:00:00.0,PRITHIV CHANDRA HOSPITAL_ NAWALPARASI,Nepal / 5 Lumbini Province / 507 NAWALPARASI W...,13872,e8msu3rYPr7,2.0,NAWALPARASI EAST,Male,Rif Resistance NOT DETECTED,50706 Pratapapur Rural Municipality,354769,35 to 44
2,"NTC, Lab (ntc.lab1)",2020-06-24 00:00:00.0,NATIONAL TUBERCULOSIS CONTROL CENTRE_BHAKTAPUR,Nepal / 3 Bagmati Province / 307 BHAKTAPUR / 3...,11163,DAj3sgOnbDt,10.0,KATHMANDU,Female,Rif Resistance NOT DETECTED,,8800886,25 to 34
3,"MADHYABINDU DISTRICT HOSPITAL, NAWALPARASI_LAB...",2022-01-03 00:00:00.0,MIDPOINT DISTRICT HOSPITAL_NAWALPARASI EAST,Nepal / 4 Gandaki Province / 408 NAWALPARASI E...,13856,UPxxC2e48GJ,,NAWALPARASI EAST,Female,MTB NOT DETECTED,,3740427,45 to 54
4,"DISTRICT HOSPITAL_ UDAYAPUR, DISTRICT HOSPITAL...",2020-09-24 00:00:00.0,DISTRICT HOSPITAL_ UDAYAPUR,Nepal / 1 Koshi Province / 114 UDAYAPUR / 1140...,15722,M0h4xQCnr5Q,13.0,UDAYAPUR,Male,MTB NOT DETECTED,11403 Triyuga Municipality,3404379,35 to 44
...,...,...,...,...,...,...,...,...,...,...,...,...,...
99995,"DAMAULI HOSPITAL, TANAHU_LAB (ddht.lab)",2022-03-04 00:00:00.0,DAMAULI DISTRICT HOSPITAL_ TANAHU,Nepal / 4 Gandaki Province / 407 TANAHU / 4070...,15578,StYGSnQkGYd,6.0,TANAHU,Male,MTB NOT DETECTED,40709 Bandipur Rural Municipality,5539227,15 to 24
99996,"DISTRICT HOSPITAL_ DOTI, DISTRICT HOSPITAL_ DO...",2022-03-01 00:00:00.0,DISTRICT HOSPITAL_ DOTI,Nepal / 7 Sudurpashchim Province / 706 DOTI / ...,11854,xQ9yCoUNMvb,8.0,DOTI,Female,MTB NOT DETECTED,70604 Shikhar Municipality,8616709,65+
99997,"DISTRICT HOSPITAL_ DOTI, DISTRICT HOSPITAL_ DO...",2022-03-02 00:00:00.0,DISTRICT HOSPITAL_ DOTI,Nepal / 7 Sudurpashchim Province / 706 DOTI / ...,11854,xQ9yCoUNMvb,5.0,DOTI,Female,MTB NOT DETECTED,70602 Sayal Rural Municipality,462914,15 to 24
99998,"Tuberculosis Treatment Center,Pokhara, Pokhara...",2022-03-04 00:00:00.0,TUBERCULOSIS TREATMENT CENTER_POKHARA,Nepal / 4 Gandaki Province / 405 KASKI / 40504...,15903,gC0JprggWan,10.0,KASKI,Female,MTB NOT DETECTED,40504 Pokhara Metropolitan City,121239,65+


In [9]:
df1.dtypes

[('last_updated_by', 'string'),
 ('enrollment_date', 'string'),
 ('organisation_unit_name', 'string'),
 ('organisation_unit_name_hierarchy', 'string'),
 ('organisation_unit_code', 'string'),
 ('organisation_unit', 'string'),
 ('ward_number', 'int'),
 ('district', 'string'),
 ('sex', 'string'),
 ('genexpert_test_result', 'string'),
 ('municipality', 'string'),
 ('patient_id', 'int'),
 ('age', 'string')]

In [10]:
print(df.shape)
df.age.unique()


(100000, 13)


array(['45 to 54', '35 to 44', '25 to 34', '55 to 64', '0 to 14',
       '15 to 24', '65+', None], dtype=object)

In [11]:
# Print the shape of the DataFrame
num_rows = df1.count()
num_columns = len(df1.columns)
print(f"Number of rows: {num_rows}")
print(f"Number of columns: {num_columns}")

# Get unique values in the 'age' column
unique_ages = df1.select('age').distinct().collect()
unique_ages_list = [row['age'] for row in unique_ages]
print("Unique ages:", unique_ages_list)

Number of rows: 100000
Number of columns: 13
Unique ages: ['45 to 54', None, '15 to 24', '0 to 14', '35 to 44', '55 to 64', '65+', '25 to 34']


# Removing rows having None age 

In [12]:
df = df[df['age'].notna() ]

In [13]:
# Filter out rows where 'age' column is not null
df1_filtered = df1.filter(df1['age'].isNotNull())


In [14]:
print(df.shape)
df.age.unique()

(99829, 13)


array(['45 to 54', '35 to 44', '25 to 34', '55 to 64', '0 to 14',
       '15 to 24', '65+'], dtype=object)

In [15]:
# Print the shape of the DataFrame
num_rows = df1_filtered.count()
num_columns = len(df1.columns)
print(f"Number of rows: {num_rows}")
print(f"Number of columns: {num_columns}")
# Get unique values in the 'age' column
unique_ages = df1.select('age').distinct().collect()
unique_ages_list = [row['age'] for row in unique_ages]
print("Unique ages:", unique_ages_list)

Number of rows: 99829
Number of columns: 13
Unique ages: ['45 to 54', None, '15 to 24', '0 to 14', '35 to 44', '55 to 64', '65+', '25 to 34']




## Cleaning NULL values

In [16]:
df_cleaned = df.dropna(how='any')

In [17]:
df1_filtered = df1_filtered.dropna(how='any')
# Print the shape of the DataFrame
num_rows = df1_filtered.count()
num_columns = len(df1.columns)
print(f"Number of rows: {num_rows}")
print(f"Number of columns: {num_columns}")

Number of rows: 22914
Number of columns: 13


In [18]:
print(df_cleaned.shape)
df_cleaned.genexpert_test_result.unique()


(22914, 13)


array(['Rif Resistance DETECTED', 'Rif Resistance NOT DETECTED',
       'MTB NOT DETECTED', 'Error', 'NO RESULT', 'Invalid',
       'Rif Indeterminate'], dtype=object)

In [19]:
# Get unique values in the 'age' column
unique_results = df1_filtered.select('genexpert_test_result').distinct().collect()
unique_results_list = [row['genexpert_test_result'] for row in unique_results]
print("unique_results:", unique_results_list)

unique_results: ['MTB NOT DETECTED', 'NO RESULT', 'Rif Resistance DETECTED', 'Invalid', 'Rif Indeterminate', 'Rif Resistance NOT DETECTED', 'Error']


## Cleaning Error , Invalid , Noresult and Null Result from test output

In [20]:
df_cleaned = df_cleaned[~df_cleaned['genexpert_test_result'].isin (['Error','Invalid',None,'NO RESULT'])]

In [21]:
print(df_cleaned.shape)
df_cleaned.genexpert_test_result.unique()

(21505, 13)


array(['Rif Resistance DETECTED', 'Rif Resistance NOT DETECTED',
       'MTB NOT DETECTED', 'Rif Indeterminate'], dtype=object)

In [22]:
df1_filtered2 = df1_filtered.filter(col('genexpert_test_result').isin(['MTB NOT DETECTED',  'Rif Resistance DETECTED', 'Rif Indeterminate', 'Rif Resistance NOT DETECTED']))

In [23]:
# Print the shape of the DataFrame
num_rows = df1_filtered2.count()
num_columns = len(df1.columns)
print(f"Number of rows: {num_rows}")
print(f"Number of columns: {num_columns}")
# Get unique values in the 'age' column
unique_results = df1_filtered2.select('genexpert_test_result').distinct().collect()
unique_results_list = [row['genexpert_test_result'] for row in unique_results]
print("unique_results:", unique_results_list)

Number of rows: 21505
Number of columns: 13
unique_results: ['MTB NOT DETECTED', 'Rif Resistance DETECTED', 'Rif Indeterminate', 'Rif Resistance NOT DETECTED']


In [24]:
df_cleaned

Unnamed: 0,last_updated_by,enrollment_date,organisation_unit_name,organisation_unit_name_hierarchy,organisation_unit_code,organisation_unit,ward_number,district,sex,genexpert_test_result,municipality,patient_id,age
0,"Lumbini Medical College, Palpa_LAB (lmcp.lab)",2020-11-23 00:00:00.0,LUMBINI MEDICAL COLLEGE_PALPA,Nepal / 5 Lumbini Province / 506 PALPA / 50605...,14076,R0g8PTLzqmq,1.0,PALPA,Male,Rif Resistance DETECTED,50602 Purbakhola Rural Municipality,677367,45 to 54
1,"Prithivi Chandra Hospital, Nawalparasi_LAB (pc...",2020-12-01 00:00:00.0,PRITHIV CHANDRA HOSPITAL_ NAWALPARASI,Nepal / 5 Lumbini Province / 507 NAWALPARASI W...,13872,e8msu3rYPr7,2.0,NAWALPARASI EAST,Male,Rif Resistance NOT DETECTED,50706 Pratapapur Rural Municipality,354769,35 to 44
4,"DISTRICT HOSPITAL_ UDAYAPUR, DISTRICT HOSPITAL...",2020-09-24 00:00:00.0,DISTRICT HOSPITAL_ UDAYAPUR,Nepal / 1 Koshi Province / 114 UDAYAPUR / 1140...,15722,M0h4xQCnr5Q,13.0,UDAYAPUR,Male,MTB NOT DETECTED,11403 Triyuga Municipality,3404379,35 to 44
5,"SETI ZONAL HOSPITAL, KAILALI_LAB (szhk.lab)",2020-12-22 00:00:00.0,SETI ZONAL HOSPITAL_ KAILALI,Nepal / 7 Sudurpashchim Province / 708 KAILALI...,12352,CZSqk9wH36L,2.0,KAILALI,Male,MTB NOT DETECTED,70804 Gauriganga Municipality,6651496,55 to 64
30,"DISTRICT HOSPITAL_ UDAYAPUR, DISTRICT HOSPITAL...",2020-11-25 00:00:00.0,DISTRICT HOSPITAL_ UDAYAPUR,Nepal / 1 Koshi Province / 114 UDAYAPUR / 1140...,15722,M0h4xQCnr5Q,7.0,UDAYAPUR,Female,MTB NOT DETECTED,11404 Rautamai Rural Municipality,8715899,25 to 34
...,...,...,...,...,...,...,...,...,...,...,...,...,...
99994,"NEPALGUNJ MEDICAL COLLEGE TEACHING HOSPITAL, K...",2022-03-04 00:00:00.0,NEPALGUNJ MEDICAL COLLEGE TEACHING HOSPITAL_KO...,Nepal / 5 Lumbini Province / 511 BANKE / 51102...,10931,MVM4GjEbulf,12.0,JAJARKOT,Male,MTB NOT DETECTED,60704 Chhedagad Municipality,2651546,65+
99995,"DAMAULI HOSPITAL, TANAHU_LAB (ddht.lab)",2022-03-04 00:00:00.0,DAMAULI DISTRICT HOSPITAL_ TANAHU,Nepal / 4 Gandaki Province / 407 TANAHU / 4070...,15578,StYGSnQkGYd,6.0,TANAHU,Male,MTB NOT DETECTED,40709 Bandipur Rural Municipality,5539227,15 to 24
99996,"DISTRICT HOSPITAL_ DOTI, DISTRICT HOSPITAL_ DO...",2022-03-01 00:00:00.0,DISTRICT HOSPITAL_ DOTI,Nepal / 7 Sudurpashchim Province / 706 DOTI / ...,11854,xQ9yCoUNMvb,8.0,DOTI,Female,MTB NOT DETECTED,70604 Shikhar Municipality,8616709,65+
99997,"DISTRICT HOSPITAL_ DOTI, DISTRICT HOSPITAL_ DO...",2022-03-02 00:00:00.0,DISTRICT HOSPITAL_ DOTI,Nepal / 7 Sudurpashchim Province / 706 DOTI / ...,11854,xQ9yCoUNMvb,5.0,DOTI,Female,MTB NOT DETECTED,70602 Sayal Rural Municipality,462914,15 to 24


In [25]:
df_cleaned = df_cleaned.reset_index().drop(columns=['index'])
df_cleaned

Unnamed: 0,last_updated_by,enrollment_date,organisation_unit_name,organisation_unit_name_hierarchy,organisation_unit_code,organisation_unit,ward_number,district,sex,genexpert_test_result,municipality,patient_id,age
0,"Lumbini Medical College, Palpa_LAB (lmcp.lab)",2020-11-23 00:00:00.0,LUMBINI MEDICAL COLLEGE_PALPA,Nepal / 5 Lumbini Province / 506 PALPA / 50605...,14076,R0g8PTLzqmq,1.0,PALPA,Male,Rif Resistance DETECTED,50602 Purbakhola Rural Municipality,677367,45 to 54
1,"Prithivi Chandra Hospital, Nawalparasi_LAB (pc...",2020-12-01 00:00:00.0,PRITHIV CHANDRA HOSPITAL_ NAWALPARASI,Nepal / 5 Lumbini Province / 507 NAWALPARASI W...,13872,e8msu3rYPr7,2.0,NAWALPARASI EAST,Male,Rif Resistance NOT DETECTED,50706 Pratapapur Rural Municipality,354769,35 to 44
2,"DISTRICT HOSPITAL_ UDAYAPUR, DISTRICT HOSPITAL...",2020-09-24 00:00:00.0,DISTRICT HOSPITAL_ UDAYAPUR,Nepal / 1 Koshi Province / 114 UDAYAPUR / 1140...,15722,M0h4xQCnr5Q,13.0,UDAYAPUR,Male,MTB NOT DETECTED,11403 Triyuga Municipality,3404379,35 to 44
3,"SETI ZONAL HOSPITAL, KAILALI_LAB (szhk.lab)",2020-12-22 00:00:00.0,SETI ZONAL HOSPITAL_ KAILALI,Nepal / 7 Sudurpashchim Province / 708 KAILALI...,12352,CZSqk9wH36L,2.0,KAILALI,Male,MTB NOT DETECTED,70804 Gauriganga Municipality,6651496,55 to 64
4,"DISTRICT HOSPITAL_ UDAYAPUR, DISTRICT HOSPITAL...",2020-11-25 00:00:00.0,DISTRICT HOSPITAL_ UDAYAPUR,Nepal / 1 Koshi Province / 114 UDAYAPUR / 1140...,15722,M0h4xQCnr5Q,7.0,UDAYAPUR,Female,MTB NOT DETECTED,11404 Rautamai Rural Municipality,8715899,25 to 34
...,...,...,...,...,...,...,...,...,...,...,...,...,...
21500,"NEPALGUNJ MEDICAL COLLEGE TEACHING HOSPITAL, K...",2022-03-04 00:00:00.0,NEPALGUNJ MEDICAL COLLEGE TEACHING HOSPITAL_KO...,Nepal / 5 Lumbini Province / 511 BANKE / 51102...,10931,MVM4GjEbulf,12.0,JAJARKOT,Male,MTB NOT DETECTED,60704 Chhedagad Municipality,2651546,65+
21501,"DAMAULI HOSPITAL, TANAHU_LAB (ddht.lab)",2022-03-04 00:00:00.0,DAMAULI DISTRICT HOSPITAL_ TANAHU,Nepal / 4 Gandaki Province / 407 TANAHU / 4070...,15578,StYGSnQkGYd,6.0,TANAHU,Male,MTB NOT DETECTED,40709 Bandipur Rural Municipality,5539227,15 to 24
21502,"DISTRICT HOSPITAL_ DOTI, DISTRICT HOSPITAL_ DO...",2022-03-01 00:00:00.0,DISTRICT HOSPITAL_ DOTI,Nepal / 7 Sudurpashchim Province / 706 DOTI / ...,11854,xQ9yCoUNMvb,8.0,DOTI,Female,MTB NOT DETECTED,70604 Shikhar Municipality,8616709,65+
21503,"DISTRICT HOSPITAL_ DOTI, DISTRICT HOSPITAL_ DO...",2022-03-02 00:00:00.0,DISTRICT HOSPITAL_ DOTI,Nepal / 7 Sudurpashchim Province / 706 DOTI / ...,11854,xQ9yCoUNMvb,5.0,DOTI,Female,MTB NOT DETECTED,70602 Sayal Rural Municipality,462914,15 to 24


In [26]:
df_cleaned.to_csv('df_cleaned.csv', sep=',', index=False, encoding='utf-8')

In [27]:
df1_filtered2.show()

+--------------------+--------------------+----------------------+--------------------------------+----------------------+-----------------+-----------+----------------+------+---------------------+--------------------+----------+--------+
|     last_updated_by|     enrollment_date|organisation_unit_name|organisation_unit_name_hierarchy|organisation_unit_code|organisation_unit|ward_number|        district|   sex|genexpert_test_result|        municipality|patient_id|     age|
+--------------------+--------------------+----------------------+--------------------------------+----------------------+-----------------+-----------+----------------+------+---------------------+--------------------+----------+--------+
|Lumbini Medical C...|2020-11-23 00:00:...|  LUMBINI MEDICAL C...|            Nepal / 5 Lumbini...|                 14076|      R0g8PTLzqmq|          1|           PALPA|  Male| Rif Resistance DE...|50602 Purbakhola ...|    677367|45 to 54|
|Prithivi Chandra ...|2020-12-01 00:00:.

## Converting pandas dataframe to pyspark dataframe to load into hive 

In [28]:
df_forhive = spark.createDataFrame(df_cleaned)

In [29]:
df_forhive.printSchema()

root
 |-- last_updated_by: string (nullable = true)
 |-- enrollment_date: string (nullable = true)
 |-- organisation_unit_name: string (nullable = true)
 |-- organisation_unit_name_hierarchy: string (nullable = true)
 |-- organisation_unit_code: string (nullable = true)
 |-- organisation_unit: string (nullable = true)
 |-- ward_number: double (nullable = true)
 |-- district: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- genexpert_test_result: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- patient_id: long (nullable = true)
 |-- age: string (nullable = true)



In [31]:
df_forhive.show()

2024-09-15 14:28:19,944 WARN scheduler.TaskSetManager: Stage 23 contains a task of very large size (1102 KiB). The maximum recommended task size is 1000 KiB.
2024-09-15 14:28:19,961 ERROR executor.Executor: Exception in task 0.0 in stage 23.0 (TID 870)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark3/spark-3.1.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 477, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 3.6 than that in driver 3.8, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.

Py4JJavaError: An error occurred while calling o120.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 870) (10.0.2.15 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark3/spark-3.1.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 477, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 3.6 than that in driver 3.8, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark3/spark-3.1.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 477, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 3.6 than that in driver 3.8, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


## As we have already created database in hive, we will be using the database named 'my_databasemtb'

In [32]:
df1_filtered2.printSchema()

root
 |-- last_updated_by: string (nullable = true)
 |-- enrollment_date: string (nullable = true)
 |-- organisation_unit_name: string (nullable = true)
 |-- organisation_unit_name_hierarchy: string (nullable = true)
 |-- organisation_unit_code: string (nullable = true)
 |-- organisation_unit: string (nullable = true)
 |-- ward_number: integer (nullable = true)
 |-- district: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- genexpert_test_result: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- patient_id: integer (nullable = true)
 |-- age: string (nullable = true)



In [33]:
#spark.sql("DROP DATABASE IF EXISTS mtb_database CASCADE")

In [34]:
# Create the Hive database if it doesn't exist
# spark.sql("CREATE DATABASE IF NOT EXISTS my_database")
spark.sql("USE my_databasemtb")

2024-09-15 14:28:25,027 WARN conf.HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
2024-09-15 14:28:25,029 WARN conf.HiveConf: HiveConf of name hive.stats.retries.wait does not exist
2024-09-15 14:28:27,229 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException


DataFrame[]

## Creating table in that database or if table exists, we drop it and create new table

In [35]:
spark.sql("DROP TABLE IF EXISTS my_databasemtb.mtb_table")

DataFrame[]

In [36]:
# df_forhive.write.mode("overwrite").option("path", "hdfs://localhost:9000/user/hive/warehouse/my_databasemtb.db/mtb_table").saveAsTable("my_databasemtb.mtb_table")
df1_filtered2.write.mode("overwrite").saveAsTable("my_databasemtb.mtb_table")

2024-09-15 14:28:31,102 WARN session.SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
2024-09-15 14:28:31,186 WARN conf.HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
2024-09-15 14:28:31,186 WARN conf.HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
2024-09-15 14:28:31,187 WARN conf.HiveConf: HiveConf of name hive.stats.retries.wait does not exist


## Showing tables in the database

In [37]:
spark.sql("SHOW TABLES IN my_databasemtb").show()

+--------------+---------+-----------+
|      database|tableName|isTemporary|
+--------------+---------+-----------+
|my_databasemtb|mtb_table|      false|
+--------------+---------+-----------+



In [38]:
spark.sql("SELECT * FROM my_databasemtb.mtb_table").show()


+--------------------+--------------------+----------------------+--------------------------------+----------------------+-----------------+-----------+----------+------+---------------------+--------------------+----------+--------+
|     last_updated_by|     enrollment_date|organisation_unit_name|organisation_unit_name_hierarchy|organisation_unit_code|organisation_unit|ward_number|  district|   sex|genexpert_test_result|        municipality|patient_id|     age|
+--------------------+--------------------+----------------------+--------------------------------+----------------------+-----------------+-----------+----------+------+---------------------+--------------------+----------+--------+
|PIPARA SIMARA PHC...|2022-01-02 00:00:...|  JEETPUR SIMARA HO...|            Nepal / 2 Madhesh...|                 11045|      lCbbjLpwRA5|         15|      BARA|Female| Rif Resistance NO...|20703 Jitpur Sima...|   3156371|     65+|
|INARUWA HOSPITAL_...|2022-01-02 00:00:...|  INARUWA HOSPITAL_..