In [1]:
#imports
from selenium import webdriver 
from selenium.webdriver.chrome.service import Service as ChromeService 
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.selenium_manager import SeleniumManager 
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import Select
from time import sleep
from googletrans import Translator
from google_trans_new import google_translator  
from deep_translator import GoogleTranslator
import os
from bs4 import BeautifulSoup
import numpy as np
import pandas as pd
import pandas as pd
from sqlalchemy import create_engine
import warnings
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, StringType
from tqdm import tqdm
import pyspark.pandas as ps
warnings.filterwarnings('ignore')



In [2]:
#Functions
def translator(s,source):
    return GoogleTranslator(source=source, target='en').translate(s)


def translator_v2(s,source='auto'):
        translator = Translator()
        return translator.translate(s, src='hi',dest='en').text if s != '' else 'null' 

translate_udf = udf(translator_v2,StringType())

In [3]:
#selenium webdriver
input_params_dropdown_by_id = {}
input_params_dropdown_by_id['dbselect'] = '2023' #Select Year
input_params_dropdown_by_id['district_id'] = 'मुंबई उपनगर' #District
input_params_dropdown_by_id['taluka_id'] = 'अंधेरी' #Taluka
input_params_dropdown_by_id['village_id'] = 'बांद्रा' #Village

input_params_text_by_id = {}
input_params_text_by_id['free_text'] = '2023' 

url = 'https://pay2igr.igrmaharashtra.gov.in/eDisplay/Propertydetails/index' 
driver = webdriver.Chrome() 
driver.implicitly_wait(10)
driver.get(url)

for key, value in input_params_dropdown_by_id.items():
	print(key,value)
	dropdown = driver.find_element(by=By.ID,value=key)
	dropdown_select = Select(dropdown)
	for option in dropdown_select.options:
		if option.text == value:
			option.click()
			print(f'Clicked {option.text} for {key}')
			break
	driver.implicitly_wait(2)

#Input reg year
driver.find_element(by=By.ID, value='free_text').send_keys(input_params_text_by_id['free_text'])
driver.implicitly_wait(2)

#Input Captcha
captcha = input()
driver.find_element(by=By.ID, value='cpatchaTextBox').send_keys(captcha)
print(f'Entered Captcha {captcha}')
driver.implicitly_wait(2)

#Submit to get result set
driver.find_element(by=By.ID,value='submit').click()
driver.implicitly_wait(10)


#Click on 50 Pages
dropdown = driver.find_element(by=By.NAME, value='tableparty_length')
dropdown_select = Select(dropdown)
for option in dropdown_select.options:
	if option.text == 'All':
		option.click()
		print('Set to All Records')
		break

print('So Far, So Good')

dbselect 2023
Clicked 2023 for dbselect
district_id मुंबई उपनगर
Clicked मुंबई उपनगर for district_id
taluka_id अंधेरी
Clicked अंधेरी for taluka_id
village_id बांद्रा
Clicked बांद्रा for village_id
Entered Captcha 18PUK2
Set to All Records
So Far, So Good


In [10]:
#scrap records
records_raw = pd.DataFrame(columns=['अनु क्र.','दस्त क्र.','दस्त प्रकार','दू. नि. कार्यालय','वर्ष','लिहून देणार','लिहून घेणार','इतर माहीती','सूची क्र. २'])

print('Data Loading...')
for index, table in enumerate(tqdm(driver.find_elements(by=By.ID, value='tbdata'))):
    data = [item.text if item.text != 'सूची क्र. २' else item.find_element(by=By.TAG_NAME,value='a').get_attribute('href') for item in table.find_elements(by=By.XPATH, value=".//*[self::td or self::th]")]
    records_raw.loc[len(records_raw)] = data
    # print(data)
    # print(f'{index} Row appended',)

print('Finished!')
records_raw
records_translate = records_raw.copy()

Data Loading...


100%|██████████| 1209/1209 [03:09<00:00,  6.37it/s]

Finished!





In [11]:
# Insert Scraped Raw Data in postgres
db_user = 'postgres'
db_password = 'Sunrise12345'
db_host = '127.0.0.1'
db_port = '5432'
db_name = 'propReturns'

engine = create_engine(f'postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}')
table_name = 'record_details_raw'

records_raw.to_sql(table_name, engine, if_exists='replace', index=True)

print(f"{len(records_raw)} DataFrame has been inserted into the '{table_name}' table.")

1209 DataFrame has been inserted into the 'record_details_raw' table.


In [8]:
#Approach Via Pandas --> Extremely Slow
records_translate = records_raw.copy()
records_translate.columns = [translator(col,source='auto') for col in records_translate.columns]
cols_to_translate = [
    'diarrhea type',
    'Du. Prohibit. Office',
    'Will write', 
    'Will write down', 
    'Other information'
    ]

records_translated = pd.DataFrame(columns=records_translate.columns)
batch_size = 100
start = 0
for batch in range(start,len(records_translate),batch_size):
    print(f'Batch -> [{batch},{batch+batch_size}]')
    temp_df = records_translate.iloc[batch:batch+batch_size,:].copy()
    for col in cols_to_translate:
        print(f'Translating {col}')
        temp_df[col] = temp_df[col].apply(lambda x: translator(x,source='hi'))
        print(f'Done')
    records_translated = pd.concat([records_translated,temp_df])
records_translated.rename(columns={'Will write':'buyer_name','Will write down':'seller_name'},inplace=True)

Batch -> [0,100]
Translating diarrhea type
Done
Translating Du. Prohibit. Office
Done
Translating Will write


RequestError: Request exception can happen due to an api connection error. Please check your connection and try again

In [12]:
# del os.environ['PYSPARK_PYTHON'] 
# del os.environ['PYSPARK_DRIVER_PYTHON'] 
os.environ['PYSPARK_PYTHON'] = sys.executable 
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [14]:
spark.stop()
del spark

In [15]:
# Approach Via Spark
spark = SparkSession.builder.getOrCreate()
# spark = SparkSession.builder \
#         .config('spark.executor.instances', 4) \
#         .config("spark.sql.execution.arrow.enabled", "true") \
#         .config("spark.dynamicAllocation.enabled", "true") \
#         .config("spark.dynamicAllocation.minExecutors","1") \
#         .config("spark.dynamicAllocation.maxExecutors","5") \
#         .config("spark.executor.cores",6) \
#         .config("spark.executor.memory","2g") \
#         .getOrCreate()
# spark.conf.set("spark.sql.execution.arrow.enabled", "true")
# #spark.conf.set("spark.dynamicAllocation.enabled", "true")
# spark.conf.set("spark.executor.cores", 4)
# spark.conf.set("spark.dynamicAllocation.minExecutors","1")
# spark.conf.set("spark.dynamicAllocation.maxExecutors","5")
# conf = spark.sparkContext.getConf()
spark

In [16]:
#Get the Data from SQL directly
host = '127.0.0.1'
port = '5432'
database = 'propReturns'
username = 'postgres'
password = 'Sunrise12345'
url = f"jdbc:postgresql://{host}:{port}/{database}"
table_name = 'record_details_raw'

engine = create_engine(f'postgresql://{username}:{password}@{host}:{port}/{database}')

spark_df = spark.read.format("jdbc") \
        .option("url", url) \
        .option("driver", "org.postgresql.Driver") \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password).load()

records_raw = pd.read_sql('record_details_raw',engine)

In [17]:
new_names = []
for col in records_raw.columns:
    new_names.append(translator(col,source='auto'))

new_names = [x.replace('.','') for x in new_names]
spark_df = spark_df.toDF(*new_names)

In [18]:
spark_df.show()

+-----+-----+-----------+--------------------+------------------+----------+--------------------+--------------------+--------------------+--------------------+
|index|Sl no|Diarrhea no|       diarrhea type|Du Prohibit Office|      Year|          Will write|     Will write down|   Other information|           List no 2|
+-----+-----+-----------+--------------------+------------------+----------+--------------------+--------------------+--------------------+--------------------+
|    0|    1|      13217|           भाडेपट्टा|सह दु.नि. अंधेरी 7|25/07/2023|1) श्रीमती चंद्रक...|1) महाराष्ट़ हाऊस...|1) इतर माहिती: पि...|https://pay2igr.i...|
|    1|    2|       4286|       विकसनकरारनामा|सह दु.नि. अंधेरी 7|10/03/2023|1) अजित सिंह करता...|1) नाविश रियल्टी ...|1) इतर माहिती: जम...|https://pay2igr.i...|
|    2|    3|       2449|             सेल डीड|सह दु.नि. अंधेरी 7|09/02/2023|1) नूपुर अनिल कळक...|1) हर्ष सतपाल मल्...|1) इतर माहिती: सद...|https://pay2igr.i...|
|    3|    4|       8691|66-नोटीस 

In [19]:
cols_to_translate = [
    'diarrhea type',
    'Du Prohibit Office',
    'Will write', 
    'Will write down', 
    'Other information'
    ]

for column in cols_to_translate:
    spark_df = spark_df.withColumn(column, translate_udf(spark_df[column]))

spark_df = spark_df.withColumnRenamed(existing='Will write',new='buyer_name')
spark_df = spark_df.withColumnRenamed(existing='Will write down',new='seller_name')

In [10]:
spark_df.show()

+-----+-----+-----------+--------------------+------------------+----------+--------------------+--------------------+--------------------+--------------------+
|index|Sl no|Diarrhea no|       diarrhea type|Du Prohibit Office|      Year|          buyer_name|         seller_name|   Other information|           List no 2|
+-----+-----+-----------+--------------------+------------------+----------+--------------------+--------------------+--------------------+--------------------+
|    0|    1|       4286|development agree...|Co. D.N. Andheri 7|10/03/2023|1) Ajit Singh Kar...|1) Paresh Ranchho...|1) Other informat...|https://pay2igr.i...|
|    1|    2|      13217|               lease|Co. D.N. Andheri 7|25/07/2023|1) Bhavin Sheth, ...|1) Hanmant Dhanva...|1) Other informat...|https://pay2igr.i...|
|    2|    3|       2449|           sale deed|Co. D.N. Andheri 7|09/02/2023|1) Nupur Anil Kal...|1) Harsh Satpal M...|1) Other Informat...|https://pay2igr.i...|
|    3|    4|       8691|66-Notice

In [21]:
table_name_sink = 'record_details_translated'

spark_df.select("*").write.format("jdbc")\
    .option("url", url) \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", table_name_sink) \
    .option("user", username) \
    .option("password", password).save()

Py4JJavaError: An error occurred while calling o144.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (host.docker.internal executor driver): java.net.SocketException: Connection reset
	at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:318)
	at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:346)
	at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:796)
	at java.base/java.net.Socket$SocketInputStream.read(Socket.java:1099)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:291)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:347)
	at java.base/java.io.BufferedInputStream.implRead(BufferedInputStream.java:420)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:399)
	at java.base/java.io.DataInputStream.readFully(DataInputStream.java:208)
	at java.base/java.io.DataInputStream.readFully(DataInputStream.java:179)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:570)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:731)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:902)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1036)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1036)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1036)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1034)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:901)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:82)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.net.SocketException: Connection reset
	at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:318)
	at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:346)
	at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:796)
	at java.base/java.net.Socket$SocketInputStream.read(Socket.java:1099)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:291)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:347)
	at java.base/java.io.BufferedInputStream.implRead(BufferedInputStream.java:420)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:399)
	at java.base/java.io.DataInputStream.readFully(DataInputStream.java:208)
	at java.base/java.io.DataInputStream.readFully(DataInputStream.java:179)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:570)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:731)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:902)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1036)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1036)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	... 1 more


In [22]:
records_translated = spark_df.toPandas()

In [23]:
records_translated

Unnamed: 0,index,Sl no,Diarrhea no,diarrhea type,Du Prohibit Office,Year,buyer_name,seller_name,Other information,List no 2
0,0,1,13217,lease,Co. D.N. Andheri 7,25/07/2023,"1) Bhavin Sheth, Authorized Trustee on behalf ...","1) Hanmant Dhanvare, Executive Engineer, Mahar...",1) Other information: Piece or parcel of land ...,https://pay2igr.igrmaharashtra.gov.in/eDisplay...
1,1,2,4286,development agreement,Co. D.N. Andheri 7,10/03/2023,1) Ajit Singh Kartar Singh Chandok,"1) Paresh Ranchhod Patel, Partner, Navish Real...","1) Other information: Land and construction, p...",https://pay2igr.igrmaharashtra.gov.in/eDisplay...
2,2,3,2449,sale deed,Co. D.N. Andheri 7,09/02/2023,1) Nupur Anil Kalke by Mukhtyar Shaila Anil Ka...,1) Harsh Satpal Malhotra,"1) Other Information: House No: 701, Floor No:...",https://pay2igr.igrmaharashtra.gov.in/eDisplay...
3,3,4,8691,66-Notice of list pendency,Co. D.N. Andheri 7,22/05/2023,1) Pradeep Soni,,1) Other Information: City Civil Court at Dind...,https://pay2igr.igrmaharashtra.gov.in/eDisplay...
4,4,5,3551,65-correction letter,Co. D.N. Andheri 7,27/02/2023,1) Gunjan Yogeet Kapoor\n2) Tushar Subhash Oberoi,1) Shama Subhash Oberoi,"1) House No: 201, Mala No: 2, Building Boat: O...",https://pay2igr.igrmaharashtra.gov.in/eDisplay...
...,...,...,...,...,...,...,...,...,...,...
1204,1204,1205,3173,live ad licenses,Co. D.N. Andheri 7,20/02/2023,1) Coelho Noala,1) Muhammad Kunhi KC,"1) Flat No:35/A, Floor No:1ST FLOOR, Building ...",https://pay2igr.igrmaharashtra.gov.in/eDisplay...
1205,1205,1206,2531,agreement,Co. D.N. Andheri 7,13/02/2023,1) Chief Sandeep Gawde on behalf of Chandresh ...,1) Rashmi Satish Kewalramani\n2) Varun Satish ...,"1) Other Information: Real Estate Project-\""Ru...",https://pay2igr.igrmaharashtra.gov.in/eDisplay...
1206,1206,1207,2863,live ad licenses,Co. D.N. Andheri 7,15/02/2023,1) Faisal Abdul Rahim Ghori,1) Jeram Chamadia (HUF) Laxman,"1) Flat No.: Garage No. 1, Floor No.: Ground, ...",https://pay2igr.igrmaharashtra.gov.in/eDisplay...
1207,1207,1208,2506,release deed,Co. D.N. Andheri 7,10/02/2023,1) Monisha Rahim Harji alias Monisha B.Charani...,1) Karim B. pasture,"1) House No: 1201, Floor No: Barawa Majla, Bui...",https://pay2igr.igrmaharashtra.gov.in/eDisplay...


In [24]:
# records_translated = spark_df.toPandas()
table_name = 'record_details_translated'
records_translated.to_sql(table_name, engine, if_exists='replace', index=True)
print(f"{len(records_translate)} DataFrame has been inserted into the '{table_name}' table.")