In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession as session

In [2]:
context = pyspark.SparkContext('')
spark = session.builder.master('local').appName('RSP2').getOrCreate()

```
Data Preparation
```

In [25]:
df = spark.read.json(r"D:\other\image\file.json", multiLine = True)

In [13]:
#df.show()
# 1- flatten data, change column names, fix duplicate names (price)
# 2- set data types
# 3- remove null column -- check first if that is safe

In [26]:
# 1- flatten data
df = df.select('ad data.*', 'heading', 'location.*', 'price', 'unit')

In [15]:
#df.columns

In [16]:
# 2- change column names
#dir(df)

In [17]:
#help(df.withColumnRenamed)

In [27]:
# df = df.withColumnRenamed('السعر', 'price')
df = df.withColumnRenamed('الطابق', 'floor')
df = df.withColumnRenamed('تاريخ النشر', 'publish_date')
df = df.withColumnRenamed('تطل على', 'veiw')
df = df.withColumnRenamed('رقم الإعلان', 'ad_date')
df = df.withColumnRenamed('تطل على', 'veiw')
df = df.withColumnRenamed('سعر المتر', 'meter_price')
df = df.withColumnRenamed('سنة البناء', 'built_year')
df = df.withColumnRenamed('طريقة الدفع', 'paying_method')
df = df.withColumnRenamed('نوع التشطيب', 'finish')
df = df.withColumnRenamed('نوع المعلن', 'advertiser')
df = df.withColumnRenamed('price', '_price')

In [28]:
df = df.withColumnRenamed('السعر', 'price')

In [20]:
#df.show()

In [13]:
# reexpanding data frame
# df = df.select('ad data.*', 'heading', 'location.*', 'price', 'unit')

In [14]:
#df.columns

In [15]:
#df.show()

In [17]:
# setting data types
# dir(df.veiw) 

In [18]:
#help(df.veiw.astype)

In [29]:
df # veiw columns with types

DataFrame[null: string, price: string, floor: string, publish_date: string, veiw: string, ad_date: string, رقم الإ��لان: string, meter_price: string, built_year: string, سن�� البناء: string, paying_method: string, finish: string, advertiser: string, heading: string, level1: string, level2: string, level3: array<string>, level4: array<string>, level5: array<string>, _price: array<struct<Date:string,price:string>>, unit: string]

In [31]:
from pyspark.sql.functions import udf
# user defined function to remove commas from price string, we need that first to avoid price nulls
commaRep = udf(lambda x: x.replace(',',''))

In [33]:
df = df.withColumn('price', commaRep('price'))

In [34]:
df = df.withColumn('price', df.price.astype('int'))

In [35]:
# df.select('price').show()

+--------+
|   price|
+--------+
| 8500000|
| 3595125|
| 4588811|
| 5500000|
|  460000|
| 1255000|
| 1100000|
| 1300000|
| 4600000|
| 1638000|
| 5000000|
| 1100000|
| 1250000|
| 2911000|
| 6500000|
| 7500000|
|  850000|
| 5000000|
|  550000|
|15000000|
+--------+
only showing top 20 rows



In [None]:
# trim all spaces in columns value & remove array brackets

In [37]:
from pyspark.sql import functions

In [38]:
df = df.withColumn('level2', functions.trim(df.level2))

In [61]:
df = df.withColumn('level3', (df.level3[0]))

In [62]:
df = df.withColumn('level3', functions.trim(df.level3))

In [41]:
df = df.withColumn('unit', functions.trim(df.unit))

In [67]:
# removing brackets
df = df.withColumn('level4', (df.level4[0]))

In [77]:
df = df.withColumn('level5', (df.level5[0]))

In [74]:
df = df.withColumn('level4', functions.trim(df.level4))

In [73]:
df = df.withColumn('level4', functions.trim(df.level4))

```
Transformation
```

In [70]:
def check(unit, location):
    return df.filter('unit = {} AND level4 = {}'.format(unit, location)).show()

In [71]:
#check(location = "'وست تاون'",
#     unit = "'شقق'")
# level5 needs cleaning

+----+-------+-----+------------+-----------+----------+------------+-----------+----------+-----------+-------------+-------------+-----------+--------------------+---------------+----------------+------------------+--------+------+--------------------+----+
|null|  price|floor|publish_date|       veiw|   ad_date|رقم الإ��لان|meter_price|built_year|سن�� البناء|paying_method|       finish| advertiser|             heading|         level1|          level2|            level3|  level4|level5|              _price|unit|
+----+-------+-----+------------+-----------+----------+------------+-----------+----------+-----------+-------------+-------------+-----------+--------------------+---------------+----------------+------------------+--------+------+--------------------+----+
|null|3595125|    2|  06/02/2019|      حديقة|EG-2209528|        null|    25,318 |      2020|       null|         null|SEMI_FINISHED|مالك العقار| For sale Apartment | القاهرة الكبرى|مدينة الشيخ زايد|كمبوند بالشيخ زايد|وست

In [93]:
# see supply of locations
df.groupBy('level3').count().show() 
# location , amount of supply

+--------------------+-----+
|              level3|count|
+--------------------+-----+
|         الحي السابع|   61|
|أحياء أخرى ��ى مد...|    1|
|           بيت الوطن|  174|
|      الثورة الخضراء|    3|
|         الحي العاشر|   14|
|     الحي الثالث عشر|   18|
|                null|   35|
|  كمبوند بالشيخ زايد|  814|
|     الحي السادس عشر|   83|
|         الحي الخامس|   19|
|أحياء أخرى فى مدي...|  112|
|               جرينز|    6|
|         الحي الرابع|    3|
|     الحي الثاني عشر|    8|
|          الحي الأول|   30|
|         الحي الثامن|   71|
|         الحي الثالث|    7|
|         الحي التاسع|   32|
|         حي الياسمين|   13|
|  كمبوند في ٦ اكتوبر|    7|
+--------------------+-----+
only showing top 20 rows



In [91]:
# max of unit prices 
df.groupBy('unit', 'level3').max().show()

+---------------+--------------------+----------+
|           unit|              level3|max(price)|
+---------------+--------------------+----------+
|          أراضي|أحياء أخرى فى مدي...| 134400000|
|     شقة بحديقة|           بيت الوطن|   2150500|
|أرض مباني سكنية|           بيت الوطن|   6900000|
|            شقق|                null|   3880000|
|            شقق|           بيت الوطن|   2520000|
|            مول|                null|  27000000|
|         دوبلكس|         الحي التاسع|   3400000|
|          فيلات|أحياء أخرى فى مدي...|  11000000|
|أرض مباني سكنية|         الحي الخامس|   4500000|
|          إداري|                null|   1850000|
|أرض مباني سكنية|      الثورة الخضراء|   1730000|
|       مركز طبي|أحياء أخرى فى مدي...|   1700000|
|            شقق|         الحي الثالث|    980000|
|      مقر إداري|أحياء أخرى فى مدي...|   9600000|
|         دوبلكس|         الحي الثامن|   3150000|
|            شقق|     الحي السادس عشر|   1900000|
|            روف|         الحي الخامس|   1000000|


In [98]:
# average unit price for unit
df.groupBy('unit','level3').avg().show()

+---------------+--------------------+--------------------+
|           unit|              level3|          avg(price)|
+---------------+--------------------+--------------------+
|          أراضي|أحياء أخرى فى مدي...|3.2079555555555556E7|
|     شقة بحديقة|           بيت الوطن|  2013105.2631578948|
|أرض مباني سكنية|           بيت الوطن|           4700000.0|
|            شقق|                null|  2793333.3333333335|
|            شقق|           بيت الوطن|  1876162.0967741935|
|            مول|                null|               2.7E7|
|         دوبلكس|         الحي التاسع|           2525000.0|
|          فيلات|أحياء أخرى فى مدي...|   5493333.333333333|
|أرض مباني سكنية|         الحي الخامس|           4500000.0|
|          إداري|                null|           1850000.0|
|أرض مباني سكنية|      الثورة الخضراء|           1730000.0|
|       مركز طبي|أحياء أخرى فى مدي...|           1700000.0|
|            شقق|         الحي الثالث|   576666.6666666666|
|      مقر إداري|أحياء أخرى فى مدي...|  