In [61]:
#Imports
import pyspark
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DateType
from pyspark.sql.functions import col, sum
from pyspark.context import SparkContext 

In [62]:
import findspark
findspark.init()

In [63]:
#Create Spark Session
spark = SparkSession.builder.appName("PySparkExample").getOrCreate()

In [64]:
#Schema used for dataframe
schema = StructType() \
      .add("Transaction unique identifier",StringType(),True) \
      .add("Price",IntegerType(),True) \
      .add("Date of Transfer",DateType(),True) \
      .add("Postcode",StringType(),True) \
      .add("Property Type",StringType(),True) \
      .add("Old/New",StringType(),True) \
      .add("Duration",StringType(),True) \
      .add("PAON",StringType(),True) \
      .add("SOAN",IntegerType(),True) \
      .add("Street",StringType(),True) \
      .add("Locality",StringType(),True) \
      .add("Town/City",StringType(),True) \
      .add("District",StringType(),True) \
      .add("County",StringType(),True) \
      .add("PPDCategory_Type",StringType(),True) \
      .add("Record_Status - monthly_file_only",StringType(),True)

In [65]:
df = spark.read.format("csv").option("header","true").schema(schema).option("mode","DROPMALFORMED").load("C:/Users/oseda/Documents/Cloud Assignment/202304.csv")

In [43]:
df.count()


28276227

In [45]:
#df.write.csv("")
#Partition to improve performance
df.repartition(10).rdd.getNumPartitions()

10

In [66]:
#Removing columns
df = df.drop("Locality","Record_Status - monthly_file_only","PPDCategory_Type","PAON")

df.show()

+-----------------------------+------+----------------+--------+-------------+-------+--------+----+-----------------+-------------+-------------------+---------------+
|Transaction unique identifier| Price|Date of Transfer|Postcode|Property Type|Old/New|Duration|SOAN|           Street|    Town/City|           District|         County|
+-----------------------------+------+----------------+--------+-------------+-------+--------+----+-----------------+-------------+-------------------+---------------+
|         {40FD4DF2-5362-40...| 44500|      1995-02-03| SR6 0AQ|            T|      N|       F|NULL|      HOWICK PARK|   SUNDERLAND|         SUNDERLAND|  TYNE AND WEAR|
|         {7A99F89E-7D81-4E...| 56500|      1995-01-13| CO6 1SQ|            T|      N|       F|NULL| BRICK KILN CLOSE|   COLCHESTER|          BRAINTREE|          ESSEX|
|         {28225260-E61C-4E...| 58000|      1995-07-28| B90 4TG|            T|      N|       F|NULL| RAINSBROOK DRIVE|     SOLIHULL|           SOLIHULL|  W

In [67]:
expected_schema = ['Transaction unique identifier', 'Price', 'Date of Transfer', 'Postcode', 'Property Type', 'Old/New', 'Duration', 'SOAN', 'Street', 'Town/City', 'District', 'County']

if not all(col in df.columns for col in expected_schema):
    print("Schema mismatch!")
else:
    print("Schema matchs!")

Schema matchs!


In [68]:
#Checking df column types
df.dtypes

[('Transaction unique identifier', 'string'),
 ('Price', 'int'),
 ('Date of Transfer', 'date'),
 ('Postcode', 'string'),
 ('Property Type', 'string'),
 ('Old/New', 'string'),
 ('Duration', 'string'),
 ('SOAN', 'int'),
 ('Street', 'string'),
 ('Town/City', 'string'),
 ('District', 'string'),
 ('County', 'string')]

In [69]:
#Check & Remove NA values
df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).show()

#Fill NA
df = df.fillna(value=0,subset=["SOAN"])

#Drop Invalid entries
df = df.na.drop().show()

+-----------------------------+-----+----------------+--------+-------------+-------+--------+--------+------+---------+--------+------+
|Transaction unique identifier|Price|Date of Transfer|Postcode|Property Type|Old/New|Duration|    SOAN|Street|Town/City|District|County|
+-----------------------------+-----+----------------+--------+-------------+-------+--------+--------+------+---------+--------+------+
|                            0|    0|               0|   30717|            0|      0|       0|24950367|409794|        0|       0|     0|
+-----------------------------+-----+----------------+--------+-------------+-------+--------+--------+------+---------+--------+------+

+-----------------------------+------+----------------+--------+-------------+-------+--------+----+-----------------+-------------+-------------------+---------------+
|Transaction unique identifier| Price|Date of Transfer|Postcode|Property Type|Old/New|Duration|SOAN|           Street|    Town/City|           Di

In [70]:
#Check & Remove duplicate values
#df2 = df.dropDuplicates()

In [71]:
df.write.csv("data.csv")

AttributeError: 'NoneType' object has no attribute 'write'