In [22]:
#import PySpark
from pyspark.sql import SparkSession

import pandas as pd

In [9]:
#create SparkSession
spark = SparkSession.builder.appName('SparkDemo').getOrCreate()

In [32]:
#columns
columns = ["language","users_count"]
#data
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]

In [33]:
#creating dataframe with
df = spark.createDataFrame(data).toDF(*columns)


In [34]:
#to display records
df.show()

+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+



In [36]:
# Show 50 rows
df.show(50) 

# Show 20 rows with full column value
df.show(truncate=False) 

# Show 50 rows & full column value
df.show(50,truncate=False) 

# Show 20 rows, column length 20 & displays data in vertical
df.show(n=20,truncate=20,vertical=True)


+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+

+--------+-----------+
|language|users_count|
+--------+-----------+
|Java    |20000      |
|Python  |100000     |
|Scala   |3000       |
+--------+-----------+

+--------+-----------+
|language|users_count|
+--------+-----------+
|Java    |20000      |
|Python  |100000     |
|Scala   |3000       |
+--------+-----------+

-RECORD 0-------------
 language    | Java   
 users_count | 20000  
-RECORD 1-------------
 language    | Python 
 users_count | 100000 
-RECORD 2-------------
 language    | Scala  
 users_count | 3000   



In [35]:
#to display schema
df.printSchema()

root
 |-- language: string (nullable = true)
 |-- users_count: string (nullable = true)



In [26]:
df.dtypes

[('language', 'string'), ('users_count', 'string')]

In [24]:
pd_df = pd.DataFrame(data)

In [25]:
pd_df.dtypes

0    object
1    object
dtype: object

In [37]:
# reading a csv

In [38]:
import os
os.getcwd()

'/Users/abhinandanmahata/Pyspark'

In [42]:
df2 = spark.read.csv("/Users/abhinandanmahata/Pyspark/50_Startups.csv", header='True' )

In [56]:
df2.show(2)

+---------+--------------+---------------+----------+---------+
|R&D Spend|Administration|Marketing Spend|     State|   Profit|
+---------+--------------+---------------+----------+---------+
| 165349.2|      136897.8|       471784.1|  New York|192261.83|
| 162597.7|     151377.59|      443898.53|California|191792.06|
+---------+--------------+---------------+----------+---------+
only showing top 2 rows



In [61]:
#to check datatype for any object/variable
type(df2)

pyspark.sql.dataframe.DataFrame

In [92]:
df_wodel = spark.read.csv('input_real_estate.txt')

In [99]:
df_wodel.show(truncate= False)
#(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|_c0                                                                                                                                                                                                                                                                                         |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Property_ID|Location|Price|Bedrooms|Bathrooms|Size|Price_SQ_FT|Status|1461262|Arroyo Grande|795000|3|3|2371|365.3|Short Sale|1478004|Paulo

In [100]:
#importing regex
from pyspark.sql.functions import regexp_replace, split, explode


In [124]:
df_wodel1= df_wodel.withColumn("chk", regexp_replace("_c0","(.*?\\|){8}","$0-"))
#replacing 9th "|" with "-" & creating a new col "chk"

In [123]:
df_wodel1.select("chk").show(truncate= False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|chk                                                                                                                                                                                                                                                                                             |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Property_ID|Location|Price|Bedrooms|Bathrooms|Size|Price_SQ_FT|Status|-1461262|Arroyo Grande|795000|3|3|2371|365.3|Short Sale|

# explode into multiple rows with single column

In [140]:
df_wodel2 = df_wodel1.withColumn("col_explode", explode(split("chk","\|-"))).select("col_explode")
#replacing "|-" with new row & creating under a new col "col_explode"

In [144]:
df_wodel2.show(truncate = False)

+---------------------------------------------------------------------+
|col_explode                                                          |
+---------------------------------------------------------------------+
|Property_ID|Location|Price|Bedrooms|Bathrooms|Size|Price_SQ_FT|Status|
|1461262|Arroyo Grande|795000|3|3|2371|365.3|Short Sale               |
|1478004|Paulo Pablo|399000|4|3|2818|163.59|Short Sale                |
|1486551|Paulo Pablo|545000|4|3|3032|179.75|Short Sale                |
|1492832|Santa Bay|909000|4|4|3540|286.78|Short Sale                  |
+---------------------------------------------------------------------+



In [150]:
df_wodel2.select(split("col_explode","\|")[1]).show()
#this is how we can display 1 column, but to automate we need to use below code

+-----------------------------+
|split(col_explode, \|, -1)[1]|
+-----------------------------+
|                     Location|
|                Arroyo Grande|
|                  Paulo Pablo|
|                  Paulo Pablo|
|                    Santa Bay|
+-----------------------------+



# covert dataframe to rdd and apply split

In [152]:
rdd = df_wodel2.select("col_explode").rdd

In [155]:
type(rdd)

pyspark.rdd.RDD

In [167]:
rdd.map(lambda x: x[0].split("|")).collect()

[['Property_ID',
  'Location',
  'Price',
  'Bedrooms',
  'Bathrooms',
  'Size',
  'Price_SQ_FT',
  'Status'],
 ['1461262',
  'Arroyo Grande',
  '795000',
  '3',
  '3',
  '2371',
  '365.3',
  'Short Sale'],
 ['1478004',
  'Paulo Pablo',
  '399000',
  '4',
  '3',
  '2818',
  '163.59',
  'Short Sale'],
 ['1486551',
  'Paulo Pablo',
  '545000',
  '4',
  '3',
  '3032',
  '179.75',
  'Short Sale'],
 ['1492832', 'Santa Bay', '909000', '4', '4', '3540', '286.78', 'Short Sale']]

In [162]:
rdd1=rdd.map(lambda x: x[0].split("|"))

In [172]:
rdd1.toDF().show()

+-----------+-------------+------+--------+---------+----+-----------+----------+
|         _1|           _2|    _3|      _4|       _5|  _6|         _7|        _8|
+-----------+-------------+------+--------+---------+----+-----------+----------+
|Property_ID|     Location| Price|Bedrooms|Bathrooms|Size|Price_SQ_FT|    Status|
|    1461262|Arroyo Grande|795000|       3|        3|2371|      365.3|Short Sale|
|    1478004|  Paulo Pablo|399000|       4|        3|2818|     163.59|Short Sale|
|    1486551|  Paulo Pablo|545000|       4|        3|3032|     179.75|Short Sale|
|    1492832|    Santa Bay|909000|       4|        4|3540|     286.78|Short Sale|
+-----------+-------------+------+--------+---------+----+-----------+----------+



In [173]:
rdd1_df=rdd1.toDF()

In [211]:
type(rdd1_df_header)

pyspark.sql.types.Row

# to select particular row/column
## dataframe.select([columns]).collect()[index]

In [234]:
temp= rdd1_df.collect()[0]

In [235]:
type(temp)

pyspark.sql.types.Row

In [236]:
df = spark.sparkContext.parallelize(list(temp)).toDF()

TypeError: Can not infer schema for type: <class 'str'>

In [232]:
rdd1_df.select(*rdd1_df.columns[:2]).show()

AttributeError: 'list' object has no attribute 'collect'

In [207]:
rdd1_df.show()

+-----------+-------------+------+--------+---------+----+-----------+----------+
|         _1|           _2|    _3|      _4|       _5|  _6|         _7|        _8|
+-----------+-------------+------+--------+---------+----+-----------+----------+
|Property_ID|     Location| Price|Bedrooms|Bathrooms|Size|Price_SQ_FT|    Status|
|    1461262|Arroyo Grande|795000|       3|        3|2371|      365.3|Short Sale|
|    1478004|  Paulo Pablo|399000|       4|        3|2818|     163.59|Short Sale|
|    1486551|  Paulo Pablo|545000|       4|        3|3032|     179.75|Short Sale|
|    1492832|    Santa Bay|909000|       4|        4|3540|     286.78|Short Sale|
+-----------+-------------+------+--------+---------+----+-----------+----------+

