In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PySpark").getOrCreate()
# spark

In [2]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window

import json

In [3]:
def printdf(df, l=5):
    return df.limit(l).toPandas()

def nullcount(df):
    return {col: df.filter(df[col].isNull()).count() for col in df.columns}

def shape(df):
    # df.toPandas().shape
    print((df.count(), len(df.columns)))

In [4]:
schema = StructType().add("value", StringType(), True)

raw = spark.read.csv("inputfile.txt", schema=schema)
printdf(raw)

Unnamed: 0,value
0,"name=""Jihad"" phone=01755555555 address=""BAshun..."
1,"name=""Mahmuda_Mam"" phone=0194568795 email=""mah..."
2,"name=""Naimul_Baset"" phone=01755555557 email=""n..."
3,"name=""Danish"" phone=01788888888 email=""danish@..."
4,"name=""Abir_vai"" address=""Mirpur2"" birthdate=""1..."


In [5]:
# UDF to convert 

def strToDict(line):
    return json.dumps(dict(item.split("=") for item in line.split(" ")))

convertUDF = F.udf(lambda z: strToDict(z), StringType())

# s = "name=\"Jihad\" phone=01755555555 address=\"BAshundhaara\" birthdate=\"15July\""
# strToDict(s)

In [6]:
# transform raw key-value to json

df = raw.withColumn("dict", convertUDF(F.col("value")))#.select('dict')
printdf(df)

Unnamed: 0,value,dict
0,"name=""Jihad"" phone=01755555555 address=""BAshun...","{""name"": ""\""Jihad\"""", ""phone"": ""01755555555"", ..."
1,"name=""Mahmuda_Mam"" phone=0194568795 email=""mah...","{""name"": ""\""Mahmuda_Mam\"""", ""phone"": ""01945687..."
2,"name=""Naimul_Baset"" phone=01755555557 email=""n...","{""name"": ""\""Naimul_Baset\"""", ""phone"": ""0175555..."
3,"name=""Danish"" phone=01788888888 email=""danish@...","{""name"": ""\""Danish\"""", ""phone"": ""01788888888"",..."
4,"name=""Abir_vai"" address=""Mirpur2"" birthdate=""1...","{""name"": ""\""Abir_vai\"""", ""address"": ""\""Mirpur2..."


### Convert from Json to Columns

In [7]:
df = df.select(F.col('dict'),
               F.json_tuple(F.col('dict'), 'name', 'birthdate', 'address', 'phone', 'email', 'fb_ID')
                   .alias('name', 'birthdate', 'address', 'phone', 'email', 'fb_ID'))


In [8]:
printdf(df.drop(F.col('dict')))

Unnamed: 0,name,birthdate,address,phone,email,fb_ID
0,"""Jihad""","""15July""","""BAshundhaara""",1755555555.0,,
1,"""Mahmuda_Mam""","""25August""",,194568795.0,"""mahmudamam@gmail.com""",
2,"""Naimul_Baset""","""15July""","""Gulshan""",1755555557.0,"""naimulsir@gmail.com""",naimulsir.fb.com
3,"""Danish""","""29March""","""BAshundhaara""",1788888888.0,"""danish@gmail.com""",danish.fb.com
4,"""Abir_vai""","""15July""","""Mirpur2""",,,


In [9]:
nullcount(df)

{'dict': 0,
 'name': 0,
 'birthdate': 1,
 'address': 2,
 'phone': 1,
 'email': 3,
 'fb_ID': 4}

### Experiments

In [10]:
str = "key1=value1 key2=value2 key3=value3"
str = "name=\"Jihad\" phone=01755555555 address=\"BAshundhaara\" birthdate=\"15July\""
d = dict(x.split("=") for x in str.split(" "))

print(d)

{'name': '"Jihad"', 'phone': '01755555555', 'address': '"BAshundhaara"', 'birthdate': '"15July"'}


In [11]:
schema = StructType(
    [
        StructField('k1', StringType(), True),
        StructField('k2', StringType(), True)
    ]
)

df.withColumn("b", F.from_json("a", schema))\
    .select(F.col('a'), F.col('b.*'))\
    .show()

# df = df.select(F.col('a'), 
#     F.json_tuple(F.col('a'), 'k1', 'k2', 'k3') \
#     .alias('k1', 'k2', 'k3'))

# df.schema
# df.show(truncate=False)

AnalysisException: "cannot resolve '`a`' given input columns: [phone, birthdate, email, fb_ID, address, dict, name];;\n'Project [dict#4, name#9, birthdate#10, address#11, phone#12, email#13, fb_ID#14, jsontostructs(StructField(k1,StringType,true), StructField(k2,StringType,true), 'a, Some(Europe/Berlin)) AS b#129]\n+- Project [dict#4, name#9, birthdate#10, address#11, phone#12, email#13, fb_ID#14]\n   +- Generate json_tuple(dict#4, name, birthdate, address, phone, email, fb_ID), false, [name#9, birthdate#10, address#11, phone#12, email#13, fb_ID#14]\n      +- Project [value#0, <lambda>(value#0) AS dict#4]\n         +- Relation[value#0] csv\n"

### from_json() : this converts JSON string into Struct type or Map type.

In [None]:
schema = StructType()\
    .add("name", StringType(), True)\
    .add("birthdate", StringType(), True)\
    .add("address", StringType(), True)\
    .add("phone", StringType(), True)\
    .add("email", StringType(), True)\
    .add("fb_ID", StringType(), True)

In [None]:
df.select(F.col("dict"), F.from_json("dict", schema))\
    .show()