# RealTimeScenario - Removing '"' in jsonString

In [0]:
jsonstring = """{"id":1,"name":"uday"kiran","city":"vizag"}"""

data = [(1,jsonstring)]
cols = ["col1","col2"]

df=spark.createDataFrame(data,cols)
display(df)

col1,col2
1,"{""id"":1,""name"":""uday""kiran"",""city"":""vizag""}"


In [0]:
from pyspark.sql.functions import split,col,lit,concat_ws,concat

df=df.withColumn("col3",split(col("col2"),'"name":"')[0])\
     .withColumn("col4",lit('"name":"'))\
     .withColumn("col5",split(col("col2"),'"name":"')[1])

df=df.withColumn("col6",split("col5",'"',2))\
     .withColumn("col7",concat_ws('',col("col6")))

df=df.withColumn("col8",concat(col("col3"),col("col4"),col("col7"))).select(col("col2"),col("col8"))

display(df)
                                       

col2,col8
"{""id"":1,""name"":""uday""kiran"",""city"":""vizag""}","{""id"":1,""name"":""udaykiran"",""city"":""vizag""}"


# Check if field Exists

In [0]:
fieldnames=df.schema.fieldNames()
count = fieldnames.count('col8')

if count>0:
    print("column exits")
    
else:
    print("column doesnt exists")

column exits


# Convert pyspark dataframe to pandas dataframe (toPandas)

In [0]:
data = [(1,'uday',80000),(2,'sandy',50000)]
schema = ['id','name','salary']

df = spark.createDataFrame(data,schema)
display(df)
print(type(df))

id,name,salary
1,uday,80000
2,sandy,50000


<class 'pyspark.sql.dataframe.DataFrame'>


In [0]:
df_pandas=df.toPandas()
print(df_pandas)
print(type(df_pandas))

   id   name  salary
0   1   uday   80000
1   2  sandy   50000
<class 'pandas.core.frame.DataFrame'>


#Different ways to apply function on Column in Dataframe using PySpark

In [0]:
details = [{'id':1,'name':'uday','age':24},
           {'id':2,'name':'sandy','age':23}]

df=spark.createDataFrame(details)
display(df)

age,id,name
24,1,uday
23,2,sandy


In [0]:
from pyspark.sql.functions import upper
display(df)

df1=df.withColumn('name',upper(df.name))
df1.show()

df2 = df.select('id',upper('name').alias('Name'),'age')
df2.show()

df.createOrReplaceTempView('details')
df3=spark.sql('select id, upper(name) as Name,age from details')
df3.show()

def uppername(df):
    return df.withColumn('name',upper('name'))

df.transform(uppername).show()

age,id,name
24,1,uday
23,2,sandy


+---+---+-----+
|age| id| name|
+---+---+-----+
| 24|  1| UDAY|
| 23|  2|SANDY|
+---+---+-----+

+---+-----+---+
| id| Name|age|
+---+-----+---+
|  1| UDAY| 24|
|  2|SANDY| 23|
+---+-----+---+

+---+-----+---+
| id| Name|age|
+---+-----+---+
|  1| UDAY| 24|
|  2|SANDY| 23|
+---+-----+---+

+---+---+-----+
|age| id| name|
+---+---+-----+
| 24|  1| UDAY|
| 23|  2|SANDY|
+---+---+-----+



# printSchema() to string or json in PySpark

In [0]:
data = [(1,'uday',80000),(2,'sandy',50000)]
schema = ['id','name','salary']

df = spark.createDataFrame(data,schema)
x=df.printSchema()
print(x)

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: long (nullable = true)

None


In [0]:
s=df.schema.simpleString()
print(type(s))

j = df.schema.jsonValue()
print(j)
print(type(j))

<class 'str'>
{'type': 'struct', 'fields': [{'name': 'id', 'type': 'long', 'nullable': True, 'metadata': {}}, {'name': 'name', 'type': 'string', 'nullable': True, 'metadata': {}}, {'name': 'salary', 'type': 'long', 'nullable': True, 'metadata': {}}]}
<class 'dict'>


# Write Dataframe as single file with specific name

In [0]:
data = [(1,'uday','IT',50000),
        (2,'sandy','HR',55000),
        (3,'kalyan','payroll',52000),
        (4,'rafi','IT',55000)]

schema = ['id','name','department','salary']

df = spark.createDataFrame(data,schema)
df.show()

+---+------+----------+------+
| id|  name|department|salary|
+---+------+----------+------+
|  1|  uday|        IT| 50000|
|  2| sandy|        HR| 55000|
|  3|kalyan|   payroll| 52000|
|  4|  rafi|        IT| 55000|
+---+------+----------+------+



In [0]:
df.coalesce(1).write.csv(path='/FileStore/singlepartfiledata',header=True)

In [0]:
spark.read.csv('/FileStore/singlepartfiledata/part-00000-tid-1616238833549315438-a9757733-d223-454d-a176-a09553467313-384-1-c000.csv',header=True).show()

+---+------+----------+------+
| id|  name|department|salary|
+---+------+----------+------+
|  1|  uday|        IT| 50000|
|  2| sandy|        HR| 55000|
|  3|kalyan|   payroll| 52000|
|  4|  rafi|        IT| 55000|
+---+------+----------+------+



In [0]:
filenames=dbutils.fs.ls('/FileStore/singlepartfiledata')
name = ''

for filename in filenames:
    if filename.name.endswith('.csv'):
        name = filename.name

print(name)

part-00000-tid-1616238833549315438-a9757733-d223-454d-a176-a09553467313-384-1-c000.csv


In [0]:
dbutils.fs.cp('/FileStore/singlepartfiledata/' + name,'/FileStore/data1/persons.csv')

Out[45]: True

In [0]:
spark.read.csv('/FileStore/data1/persons.csv',header=True).show()

+---+------+----------+------+
| id|  name|department|salary|
+---+------+----------+------+
|  1|  uday|        IT| 50000|
|  2| sandy|        HR| 55000|
|  3|kalyan|   payroll| 52000|
|  4|  rafi|        IT| 55000|
+---+------+----------+------+



In [0]:
dbutils.fs.rm('/FileStore/singlepartfiledata',recurse = True)

Out[49]: True