In [25]:
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\0
        getOrCreate()


In [26]:
from typing import List, Dict
import csv
import logging
import re
csv_file_path = 'output.csv'

def get_relationship_type(relation: str, language: str = 'english') -> str:
    if language == 'english':
        if 'father' in relation.lower():
            return 'father'
        elif 'mother' in relation.lower():
            return 'mother'
        elif 'wife' in relation.lower():
            return 'wife'
        elif 'husband' in relation.lower():
            return 'husband'
        else:
            return 'other'
    elif language == 'tamil':
        if 'தந்தை' in relation:
            return 'father'
        elif 'தாய்' in relation:
            return 'mother'
        elif 'கணவர்' in relation:
            return 'husband'
        elif 'மனைவி' in relation:
            return 'wife'
        else:
            return 'other'
    elif language == 'hindi':
        if 'पिता' in relation:
            return 'father'
        elif 'माता' in relation:
            return 'mother'
        elif 'पति' in relation:
            return 'husband'
        elif 'पत्नी' in relation:
            return 'wife'
        else:
            return 'other'
    elif language == 'punjabi':
        if 'ਪਿਤਾ' in relation:
            return 'father'
        elif 'ਮਾਤਾ' in relation:
            return 'mother'
        elif 'ਪਤੀ' in relation:
            return 'husband'
        elif 'ਪਤਨੀ' in relation:
            return 'wife'
        else:
            return 'other'
    elif language == 'kannada':
        if 'ತಂದೆ' in relation:
            return 'father'
        elif 'ತಾಯಿ' in relation:
            return 'mother'
        elif 'ಸ್ವತಂತ್ರರೂಪ' in relation:
            return 'husband'
        elif 'ಸ್ವತಂತ್ರರೂಪಿನ' in relation:
            return 'wife'
        else:
            return 'other'
    elif language == 'oriya':
        if 'ପିତା' in relation:
            return 'father'
        elif 'ମାତା' in relation:
            return 'mother'
        elif 'ପତି' in relation:
            return 'husband'
        elif 'ପତିନି' in relation:
            return 'wife'
        else:
            return 'other'
    return 'other'


def get_gender(gender: str, language: str = 'english') -> str:
    if language == 'english':
        return gender.lower()
    elif language == 'tamil':
        if 'ஆண்' in gender:
            return 'male'
        elif 'பெண்' in gender:
            return 'female'
        else:
            return 'other'
    elif language == 'hindi':
        if 'पुरुष' in gender:
            return 'male'
        elif 'महिला' in gender:
            return 'female'
        else:
            return 'other'
    elif language == 'punjabi':
        if 'ਪੁਰੁ਷' in gender:
            return 'male'
        elif 'ਸਤ੍ਰੀ' in gender:
            return 'female'
        else:
            return 'other'

    elif language == 'kannada':
        if 'ಪುರುಷ' in gender:
            return 'male'
        elif 'ಸ್ತ್ರೀ' in gender:
            return 'female'
        else:
            return 'other'

    elif language == 'oriya':
        if 'ପୁରୁଷ' in gender:
            return 'male'
        elif 'ସ୍ତ୍ରୀ' in gender:
            return 'female'
        else:
            return 'other'
    return 'other'


def extract_name(lines: List[str], language='english'):
    if not lines:
        return '---', []
    name = lines[0].split(':')
    slice_index = 1
    if len(name) != 2:
        name = '---'
    else:
        name = name[1].strip()

    # check if name continues to next line
    if len(lines) > 1 and ':' not in lines[1]:
        name = name + ' ' + lines[1].strip()
        slice_index = 2
    name = re.sub('[-]+', '', name).strip()
    return name, lines[slice_index:]


def extract_relationship(lines: List[str], language='english'):
    if not lines:
        return '---', '---', []
    relation_name = lines[0].split(':')
    slice_index = 1
    if len(relation_name) != 2:
        relation_name = '---'
        relation_type = get_relationship_type(relation_name[0], language)
    else:
        relation_type = get_relationship_type(relation_name[0], language)
        relation_name = relation_name[1].strip()

    # check if name continues to next line
    if len(lines) > 1 and ':' not in lines[1]:
        relation_name = relation_name + ' ' + lines[1].strip()
        slice_index = 2
    relation_name = re.sub('[-]+', '', relation_name).strip()
    return relation_name, relation_type, lines[slice_index:]


def extract_house_number(lines: List[str], language='english'):
    if not lines:
        return '---', []
    house_number = lines[0].split(':')
    slice_index = 1
    if len(house_number) != 2:
        house_number = '---'
    else:
        house_number = house_number[1].strip()
    return house_number, lines[slice_index:]


def extract_age_and_gender(lines: List[str], language='english'):
    if not lines:
        return '---', '---'
    age = lines[0].split(':')
    if len(age) != 3:
        age = '---'
    else:
        age = age[1].strip().split(' ')[0]

    gender = lines[0].split(':')
    if len(gender) != 3:
        gender = '---'
    else:
        gender = get_gender(gender[2].strip(), language)
    return age, gender


def parse_text(text, language='english') -> dict:
    # splitlines
    lines = text.splitlines()
    # remove empty lines
    lines = [line for line in lines if line]
    # remove lines with only whitespace
    lines = [line.strip() for line in lines if line.strip()]
    voter_details = dict()
    try:
        name_info = extract_name(lines, language)
        voter_details['name'] = name_info[0]
        remaining_data = name_info[1]
        relation_name, relationship_type, remaining_data = extract_relationship(
            remaining_data, language)
        voter_details['relation_name'] = relation_name
        voter_details['relationship_type'] = relationship_type

        voter_details['house_number'], remaining_data = extract_house_number(
            remaining_data, language)

        age, gender = extract_age_and_gender(remaining_data, language)
        voter_details['age'] = age
        voter_details['gender'] = gender
        return voter_details
    except Exception as e:
        # print traceback
        logging.exception(e)
        # return {}
        raise e

In [27]:
from pyspark.sql.functions import udf,col
parseUDF = udf(lambda z: parse_text(z))

In [28]:
import os
files = os.listdir('input')
files

['input.csv']

In [37]:
for file in files:
    path = 'input/' + file
    print(f"Processing {path}")
    
    df = spark.read.option("multiLine",True).csv(path)
    df_parsed = df.select(parseUDF(col("_c3").alias("Parsed")))
    df_parsed.write.mode('overwrite').csv("output")
    df_parsed.show(5)

Processing input/input.csv
+-------------------------+
|<lambda>(_c3 AS `Parsed`)|
+-------------------------+
|     {gender=ஆண்‌, rel...|
|     {gender=ஆண்‌, rel...|
|     {gender=பெண்‌, re...|
|     {gender=பெண்‌, re...|
|     {gender=பெண்‌, re...|
+-------------------------+
only showing top 5 rows

