In [2]:
from pyspark.sql import SparkSession

# Load data from a CSV
file_location = "/FileStore/tables/df_panel_fix.csv"
df = spark.read.format("CSV").option("inferSchema", True).option("header", True).load(file_location)
display(df.take(5))

_c0,province,specific,general,year,gdp,fdi,rnr,rr,i,fr,reg,it
0,Anhui,147002.0,,1996,2093.3,50661,0.0,0.0,0.0,1128873,East China,631930
1,Anhui,151981.0,,1997,2347.32,43443,0.0,0.0,0.0,1356287,East China,657860
2,Anhui,174930.0,,1998,2542.96,27673,0.0,0.0,0.0,1518236,East China,889463
3,Anhui,285324.0,,1999,2712.34,26131,,,,1646891,East China,1227364
4,Anhui,195580.0,32100.0,2000,2902.09,31847,0.0,0.0,0.0,1601508,East China,1499110


In [3]:
df.show()

In [4]:
df.printSchema()

In [5]:
df.columns

In [6]:
df.describe()

## Setting Data Schema and Data Types

In [7]:
from pyspark.sql.types import StructField,StringType,IntegerType,StructType

In [8]:
data_schema = [
StructField("_c0", IntegerType(), True)
,StructField("province", StringType(), True)
,StructField("specific", IntegerType(), True)
,StructField("general", IntegerType(), True)
,StructField("year", IntegerType(), True)
,StructField("gdp", IntegerType(), True)
,StructField("fdi", IntegerType(), True)
,StructField("rnr", IntegerType(), True)
,StructField("rr", IntegerType(), True)
,StructField("i", IntegerType(), True)
,StructField("fr", IntegerType(), True)
,StructField("reg", StringType(), True)
,StructField("it", IntegerType(), True)
]

In [9]:
final_struc = StructType(fields=data_schema)

## Applying the Data Schema/Data Types while reading in a CSV

In [10]:
df = spark.read.format("CSV").schema(final_struc).load(file_location)

In [11]:
df.printSchema()

In [12]:
df.show()

In [13]:
df['fr']

In [14]:
type(df['fr'])

In [15]:
df.select('fr')

In [16]:
type(df.select('fr'))

In [17]:
df.select('fr').show()

In [18]:
df.head(2)

In [19]:
df.select(['reg','fr'])

## Using select with RDDs

In [20]:
df.select(['reg','fr']).show()

In [21]:
df.withColumn('fiscal_revenue',df['fr']).show()

In [22]:
df.show()

## Renaming Columns using withColumnRenamed

In [23]:
df.withColumnRenamed('fr','new_fiscal_revenue').show()

## New Columns by Transforming extant Columns using withColumn

In [24]:
df.withColumn('double_fiscal_revenue',df['fr']*2).show()

In [25]:
df.withColumn('add_fiscal_revenue',df['fr']+1).show()

In [26]:
df.withColumn('half_fiscal_revenue',df['fr']/2).show()

In [27]:
df.withColumn('half_fr',df['fr']/2)

## Spark SQL for SQL functionality using createOrReplaceTempView

In [28]:
df.createOrReplaceTempView("economic_data")

In [29]:
sql_results = spark.sql("SELECT * FROM economic_data")

In [30]:
sql_results

In [31]:
sql_results.show()

In [32]:
spark.sql("SELECT * FROM economic_data WHERE fr=634562").show()

This post includes code adapted from [Spark and Python for Big Data udemy course](https://udemy.com/course/spark-and-python-for-big-data-with-pyspark) and [Spark and Python for Big Data notebooks](https://github.com/SuperJohn/spark-and-python-for-big-data-with-pyspark).