In [1]:
import findspark
findspark.init()

In [2]:
import numpy as np

from pyspark.sql import SparkSession
from pyspark import SparkContext as sc
from pyspark.mllib.stat import Statistics
import pyspark.sql.functions as F
from pyspark.sql.window import Window

In [3]:
spark = SparkSession.builder\
                    .appName("Assignment1")\
                    .master("local")\
                    .config('spark.executor.memory','32g')\
                    .config('spark.driver.memory','32g')\
                    .config('spark.driver.maxResultSize','2g')\
                    .getOrCreate()

df = spark.read\
    .format("jdbc")\
    .option("url", "jdbc:postgresql://localhost:5432/mernis")\
    .option("dbtable", "citizen") \
    .option("user", "postgres")\
    .option("password", "")\
    .option("driver", "org.postgresql.Driver") \
    .load()

In [4]:
df.printSchema()

root
 |-- uid: long (nullable = true)
 |-- national_identifier: string (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- mother_first: string (nullable = true)
 |-- father_first: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- birth_city: string (nullable = true)
 |-- date_of_birth: string (nullable = true)
 |-- id_registration_city: string (nullable = true)
 |-- id_registration_district: string (nullable = true)
 |-- address_city: string (nullable = true)
 |-- address_district: string (nullable = true)
 |-- address_neighborhood: string (nullable = true)
 |-- street_address: string (nullable = true)
 |-- door_or_entrance_number: string (nullable = true)
 |-- misc: string (nullable = true)



In [5]:
# 数据预处理 将date_of_birth列转为datetime格式
from pyspark.sql.functions import to_date
pattern = 'd/M/y'

df = df.withColumn('date_of_birth',to_date(df['date_of_birth'],pattern))


## E1. Calculate the oldest male among all citizens of Turkey

In [6]:
# 选出男性公民
df1 = df.filter('gender = "E"')

# 选出最早的出生日期
df1.createOrReplaceTempView("view1")
maxAge = spark.sql("SELECT MIN(date_of_birth) FROM view1").rdd.keys().collect()

In [35]:
# 选出所有在此日期出生的男性公民
df1.filter(df1.date_of_birth==maxAge[0]).collect()

[Row(uid=32198722, national_identifier='52552749852', first='CELIL', last='UNAL', mother_first='HAYRIYE', father_first='MEHMET', gender='E', birth_city='BARTIN', date_of_birth=datetime.date(1329, 9, 20), id_registration_city='BARTIN', id_registration_district='AMASRA', address_city='BARTIN', address_district='AMASRA', address_neighborhood='ALIOBASI KOYU', street_address='MERKEZ SOKAK', door_or_entrance_number='66', misc='<NULL>')]

##  E2.Claculate the most frequently occurring letters in all names

In [8]:
# 选出所需统计的姓名列作为新RDD
df2 = df.select("first","last").rdd

In [11]:
# 一并统计first及last中字母出现次数
tmp = df2.flatMap(lambda x: list(x.first)+list(x.last)).countByValue()

In [28]:
# 选出结果中频数最大的items
max(tmp.items(), key=lambda x:x[1])

('A', 82319942)

## E3.The age distribution of the country's population in age groups (0-18, 19-28, 29-38, 39-48, 49-59, >60)

In [6]:
from pyspark.sql.functions import year

# 年龄计算采用虚岁，即对年份做差
now = 2021
df3 = df.select( (now - year('date_of_birth')).alias('age')).rdd

In [28]:
# 初始化年龄段分,以E1中最大年龄作为上界
maxBin = 2021 - maxAge[0].year
bins = [0,18,28,38,48,59]+[maxBin]

df3.flatMap(lambda x:x).histogram(bins)
##### TB Update --print out format

([0, 18, 28, 38, 48, 59, 692], [0, 0, 8969059, 12522777, 11069293, 16979840])

## E4.The number of men and women in the country separately and calculate the ratio of men to women

In [50]:
# 统计男女人数
df4 = df.select('gender').rdd.map(lambda x: (x,1)).reduceByKey(lambda x,y: x + y)
ans = df4.collect()

In [56]:
print('男女人数', ans)
# 计算男女比例，即男/女
print('男女比例(男/女)为', ans[0][1]/ans[1][1])

男女人数 [(Row(gender='E'), 24534483), (Row(gender='K'), 25077226)]
男女比例(男/女)为 0.9783571356736188


## E5.Calculate the highest male birth rate and the highest female birth rate in the country

In [8]:
df5 = df.select('gender', F.month('date_of_birth').alias('month'))
# 出生率最高的月份即频数最大的出生月份

In [9]:
# 统计男性出生率最高的月份
df5E = df5.filter('gender="E"').select('month').rdd.map(lambda x: (x, 1)).reduceByKey(lambda x,y: x + y)
df5E.sortBy(lambda x: x[1], ascending=False).first()

(Row(month=1), 3911255)

In [10]:
# 统计女性出生率最高的月份
df5K = df5.filter('gender="K"').select('month').rdd.map(lambda x: (x, 1)).reduceByKey(lambda x,y: x + y)
df5K.sortBy(lambda x: x[1], ascending=False).first()

(Row(month=1), 3912665)

## E6.Calculate which street has the largest population

In [15]:
df.groupBy('street_address').count().sort("count",ascending=False).first()

Row(street_address='KOYUN KENDISI', count=4748581)

## N1.Calculate the 10 most common surnames among men and women respectively

In [17]:
df1n = df.select('gender','last')
# 男性中最常见的10个姓
df1n.filter('gender="E"').groupBy('last').count().sort("count",ascending=False).limit(10).collect()

[Row(last='YILMAZ', count=352338),
 Row(last='KAYA', count=244272),
 Row(last='DEMIR', count=231289),
 Row(last='SAHIN', count=201958),
 Row(last='CELIK', count=199622),
 Row(last='YILDIZ', count=195162),
 Row(last='YILDIRIM', count=191966),
 Row(last='OZTURK', count=178610),
 Row(last='AYDIN', count=177894),
 Row(last='OZDEMIR', count=164085)]

In [18]:
# 女性中最常见的10个姓
df1n.filter('gender="K"').groupBy('last').count().sort("count",ascending=False).limit(10).collect()

[Row(last='YILMAZ', count=355954),
 Row(last='KAYA', count=244100),
 Row(last='DEMIR', count=230428),
 Row(last='SAHIN', count=202155),
 Row(last='CELIK', count=199330),
 Row(last='YILDIZ', count=194060),
 Row(last='YILDIRIM', count=192835),
 Row(last='OZTURK', count=180292),
 Row(last='AYDIN', count=178501),
 Row(last='OZDEMIR', count=165924)]

## N2.Calculate the average age of citizens in each city, statistical analysis of the degree of population aging in each city, determine whether the current city is in an aging society
A country or region is in an aging society when 10% of the total population is over 60 years old or 7% of the total population is over 65 years old

In [28]:
df2n = df.select('address_city', (now - F.year('date_of_birth')).alias('age'))

In [9]:
# 统计每个城市市民平均年龄
df2n.groupBy('address_city').avg('age').collect()

[Row(address_city='ADANA', avg(age)=52.35311124200533),
 Row(address_city='DENIZLI', avg(age)=54.73589370796754),
 Row(address_city='BALIKESIR', avg(age)=57.40910203545882),
 Row(address_city='TRABZON', avg(age)=55.07750336119169),
 Row(address_city='BILECIK', avg(age)=56.017741433462554),
 Row(address_city='GIRESUN', avg(age)=57.46172045323453),
 Row(address_city='ARTVIN', avg(age)=57.52355755469229),
 Row(address_city='ZONGULDAK', avg(age)=54.86486356333262),
 Row(address_city='ISTANBUL', avg(age)=52.04195010450876),
 Row(address_city='ERZURUM', avg(age)=52.195794399332236),
 Row(address_city='IGDIR', avg(age)=51.041278115648225),
 Row(address_city='NEVSEHIR', avg(age)=55.09681560717092),
 Row(address_city='MANISA', avg(age)=55.39183795836534),
 Row(address_city='BITLIS', avg(age)=49.462793925634905),
 Row(address_city='BURSA', avg(age)=53.89901605269567),
 Row(address_city='IZMIR', avg(age)=54.83091460690687),
 Row(address_city='VAN', avg(age)=48.501663045815654),
 Row(address_city=

In [42]:
# 统计每个城市的人口老龄化程度
def agedOrNot(x):
    # x:age
    # return: (1, more than 60, more than 65)
    if type(x)==type(1):
        if x > 65: 
            return (1,1,1)
        elif x <= 65 and x > 60:
            return (1,1,0)
        else:
            return (1,0,0)
    # 剔除无效数据
    else: return (0,0,0)

# 第一次Map返回（1, 是否大于60, 是否大于65） Reduce分别统计各个城市（总人数，60-65人数，65以上人数）第二次Map判断是否老龄化
ans = df2n.rdd.mapValues(agedOrNot).reduceByKey(lambda x,y: (x[0] + y[0],x[1] + y[1],x[2] + y[2]))\
                                          .mapValues(lambda x:(x[1]/x[0] >= 0.1) or (x[2]/x[0] >= 0.07))
ans.collect()

[('ISTANBUL', True),
 ('IZMIR', True),
 ('KONYA', True),
 ('KAHRAMANMARAS', True),
 ('MUGLA', True),
 ('MUS', True),
 ('NEVSEHIR', True),
 ('SAKARYA', True),
 ('SIIRT', True),
 ('SANLIURFA', True),
 ('YOZGAT', True),
 ('ZONGULDAK', True),
 ('KARAMAN', True),
 ('BATMAN', True),
 ('YALOVA', True),
 ('OSMANIYE', True),
 ('AFYONKARAHISAR', True),
 ('AGRI', True),
 ('AKSARAY', True),
 ('AMASYA', True),
 ('ARDAHAN', True),
 ('ARTVIN', True),
 ('TEKIRDAG', True),
 ('BURSA', True),
 ('BARTIN', True),
 ('BAYBURT', True),
 ('DUZCE', True),
 ('HATAY', True),
 ('IGDIR', True),
 ('KARABUK', True),
 ('KILIS', True),
 ('SIRNAK', True),
 ('ANKARA', True),
 ('BILECIK', True),
 ('DENIZLI', True),
 ('USAK', True),
 ('AYDIN', True),
 ('BINGOL', True),
 ('CANKIRI', True),
 ('MARDIN', True),
 ('KOCAELI', True),
 ('TUNCELI', True),
 ('BALIKESIR', True),
 ('BOLU', True),
 ('CANAKKALE', True),
 ('CORUM', True),
 ('DIYARBAKIR', True),
 ('ELAZIG', True),
 ('ERZINCAN', True),
 ('ERZURUM', True),
 ('ESKISEHIR', Tr

## N3.Calculate which 2 months have the highest concentration of population birthdays in each of the top 10 most populous cities in the country

In [6]:
# 选择城市及出生月份
df3n = df.select('address_city', F.month('date_of_birth').alias('month'))

# 计算前十大人口城市
top10 = df3n.groupBy('address_city').count().sort("count",ascending=False).limit(10).rdd.keys().collect()

# 选出在前10大人口城市中的数据
ans = df3n.filter(df3n.address_city.isin(top10)).groupBy('address_city','month').count()

In [9]:
# 用Window筛选每个城市出生月份数排前二的月份
windowTmp = Window.partitionBy('address_city').orderBy(F.desc('count'))
ans = ans.withColumn('rank', F.rank().over(windowTmp))
ans.filter(ans.rank.isin([1,2])).orderBy('address_city','rank').collect()


[Row(address_city='ADANA', month=1, count=275752, rank=1),
 Row(address_city='ADANA', month=3, count=134666, rank=2),
 Row(address_city='ANKARA', month=1, count=451654, rank=1),
 Row(address_city='ANKARA', month=3, count=318381, rank=2),
 Row(address_city='ANTALYA', month=1, count=207515, rank=1),
 Row(address_city='ANTALYA', month=3, count=134869, rank=2),
 Row(address_city='AYDIN', month=1, count=193666, rank=1),
 Row(address_city='AYDIN', month=3, count=143498, rank=2),
 Row(address_city='BURSA', month=1, count=245047, rank=1),
 Row(address_city='BURSA', month=3, count=177832, rank=2),
 Row(address_city='ISTANBUL', month=1, count=1229999, rank=1),
 Row(address_city='ISTANBUL', month=3, count=883712, rank=2),
 Row(address_city='IZMIR', month=1, count=383830, rank=1),
 Row(address_city='IZMIR', month=3, count=281419, rank=2),
 Row(address_city='KOCAELI', month=1, count=138940, rank=1),
 Row(address_city='KOCAELI', month=3, count=104624, rank=2),
 Row(address_city='KONYA', month=1, cou

## N4.Calculate the top 3 surnames in each of the top 10 most populous cities in the country and analyzing whether there is a correlation between the surnames and the cities in which they are located

In [36]:
df4n = df.select('address_city','last').filter(df.address_city.isin(top10)).groupBy('address_city','last').count()

In [37]:
windowTmp = Window.partitionBy('address_city').orderBy(F.desc('count'))
ans = df4n.withColumn('rank', F.rank().over(windowTmp))
# 筛选出每个城市的前3大姓氏
ans = ans.filter(ans.rank.isin([1,2,3])).orderBy('address_city','last')
ans.collect()

[Row(address_city='ADANA', last='DEMIR', count=11550, rank=3),
 Row(address_city='ADANA', last='KAYA', count=13187, rank=2),
 Row(address_city='ADANA', last='YILMAZ', count=16223, rank=1),
 Row(address_city='ANKARA', last='OZTURK', count=28448, rank=3),
 Row(address_city='ANKARA', last='SAHIN', count=32057, rank=2),
 Row(address_city='ANKARA', last='YILMAZ', count=47957, rank=1),
 Row(address_city='ANTALYA', last='CELIK', count=12092, rank=3),
 Row(address_city='ANTALYA', last='KAYA', count=12566, rank=2),
 Row(address_city='ANTALYA', last='YILMAZ', count=21057, rank=1),
 Row(address_city='AYDIN', last='DEMIR', count=11420, rank=3),
 Row(address_city='AYDIN', last='KAYA', count=11812, rank=2),
 Row(address_city='AYDIN', last='YILMAZ', count=14884, rank=1),
 Row(address_city='BURSA', last='AYDIN', count=19775, rank=2),
 Row(address_city='BURSA', last='OZTURK', count=17426, rank=3),
 Row(address_city='BURSA', last='YILMAZ', count=27399, rank=1),
 Row(address_city='ISTANBUL', last='DEMIR'

In [19]:
# Correlation
from pyspark.ml.stat import ChiSquareTest
from pyspark.ml.feature import ChiSqSelector,StringIndexer,OneHotEncoder

In [16]:
# 选出各城市前3大姓氏
toplast = ans.select("last").distinct().rdd.keys().collect()

In [25]:
df4n2 = df.select('address_city','last').filter(df.address_city.isin(top10)).filter(df.last.isin(toplast))
# FeatureHasher
# 编码last为features
indexer = StringIndexer(inputCol='last',outputCol='idx_last')
df4n2 = indexer.fit(df4n2).transform(df4n2)
encoder = OneHotEncoder(inputCol="idx_last",outputCol="features_last",dropLast=False)
df4n2 = encoder.fit(df4n2).transform(df4n2)

# StringIndexer
# 编码address_city为label
indexer_city = StringIndexer(inputCol='address_city',outputCol='label_city')
df4n2 = indexer_city.fit(df4n2).transform(df4n2)

In [32]:
test = ChiSquareTest.test(df4n2,'features_last','label_city',True)
test.show()

+------------+------+----------------+------------------+
|featureIndex|pValue|degreesOfFreedom|         statistic|
+------------+------+----------------+------------------+
|           4|   0.0|               9|2810.6355566718426|
|           0|   0.0|               9| 980.3022768048272|
|           1|   0.0|               9|1414.4098275084275|
|           6|   0.0|               9| 6016.043747519692|
|           3|   0.0|               9| 4874.593242869912|
|           5|   0.0|               9| 5535.835919822795|
|           2|   0.0|               9|1239.7706094808013|
+------------+------+----------------+------------------+

