In [44]:
from pyspark.sql.types import StructField,StructType,DateType,DoubleType,IntegerType,StringType
from pyspark.sql.functions import year,round

gdpschema = StructType([
    StructField("country", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
])

schema = StructType([
    StructField("date", DateType()),
    StructField("Price", DoubleType()),
])

gdp = spark.read.csv("gdpCleaned.csv",header=True,mode="DROPMALFORMED",schema=gdpschema)
oil = spark.read.csv("oil.csv", header=True, mode="DROPMALFORMED",schema=schema)
gas = spark.read.csv("natgas.csv",header=True,mode="DROPMALFORMED", schema=schema)

oil = oil.withColumnRenamed("Price","oilPrice")
gas = gas.withColumnRenamed("Price","gasPrice")

joinedData = oil.join(gas,oil.date==gas.date,'outer').drop(gas.date)

joinedData = joinedData.select(year(joinedData.date).alias("year"),joinedData.oilPrice,joinedData.gasPrice)

joinedData = (joinedData.groupBy("year").avg("oilPrice","gasPrice")
                        .withColumnRenamed("avg(oilPrice)","oilPrice")
                        .withColumnRenamed("avg(gasPrice)","gasPrice"))

oilngasData = (joinedData.select(
                                "year",
                                round("oilPrice",3).alias("oilPrice"),
                                round("gasPrice",3).alias("gasPrice")).orderBy("year"))

gdp = gdp.join(oilngasData,gdp.year==oilngasData.year,'left').drop(oilngasData.year).orderBy("country","year")

gdp = gdp.coalesce(1)

gdp.write.csv("gdpCleanedD.csv", header=True)

In [19]:
from pyspark.sql.types import StructField,StructType,DateType,DoubleType,IntegerType,StringType
from pyspark.sql.functions import round

gdpschema = StructType([
    StructField("country", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType())
])

schema = StructType([
    StructField("country", StringType()),
    StructField("year", IntegerType()),
    StructField("workingPop", DoubleType())
])

gdp = spark.read.csv("gdpCleaned.csv",header=True,mode="DROPMALFORMED",schema=gdpschema)
work = spark.read.csv("workPop.csv", header=True, mode="DROPMALFORMED",schema=schema)
gdp = gdp.join(work,(gdp.year==work.year)&(gdp.country==work.country),"left").drop(work.year).drop(work.country)

gdp = gdp.select("country","year","gdp","gdpNote","oilPrice","gasPrice",round("workingPop",4).alias("workingPop")).orderBy("country","year")

gdp = gdp.coalesce(1)

gdp.write.csv("gdpCleanedD.csv", header=True)

In [176]:
from pyspark.sql.types import StructField,StructType,DateType,DoubleType,IntegerType,StringType

schema = StructType([
    StructField("country", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("debt", DoubleType()),
    StructField("inflation", DoubleType()),
    StructField("unemployment", DoubleType()),
    StructField("lendingInterest", DoubleType()),
    StructField("techExport", DoubleType()),
    StructField("totalReserves", DoubleType()),
    StructField("fdi", DoubleType()),
    StructField("cropProduction", DoubleType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType()),
    StructField("workingPop", DoubleType()),
])

countrySch = StructType([
    StructField("country", StringType()),
    StructField("cleanedName", StringType()),
    StructField("countryCode", StringType())
])

gdp = spark.read.csv("final.csv",header=True,mode="DROPMALFORMED",schema=schema)

countryCode = spark.read.csv("countryCode.csv",header=True,mode="DROPMALFORMED",schema=countrySch)

gdp = gdp.join(countryCode,gdp["country"]==countryCode["country"],"inner").drop(countryCode["country"])

gdp = gdp.select("country","cleanedName","countryCode","year","gdp","gdpNote","debt","inflation","unemployment",\
                 "lendingInterest","techExport","totalReserves","fdi","cropProduction","oilPrice","gasPrice",\
                "workingPop")

gdp.write.csv("gdpCleanedD.csv", header=True)

In [177]:
from pyspark.sql.types import StructField,StructType,DateType,DoubleType,IntegerType,StringType

schema = StructType([
    StructField("country", StringType()),
    StructField("cleanedName", StringType()),
    StructField("countryCode", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("debt", DoubleType()),
    StructField("inflation", DoubleType()),
    StructField("unemployment", DoubleType()),
    StructField("lendingInterest", DoubleType()),
    StructField("techExport", DoubleType()),
    StructField("totalReserves", DoubleType()),
    StructField("fdi", DoubleType()),
    StructField("cropProduction", DoubleType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType()),
    StructField("workingPop", DoubleType()),
])


exchange_rate_schema = StructType([
    StructField("country", StringType()),  
    StructField("year", IntegerType()),
    StructField("exchangeRate", DoubleType())
])


gdp = spark.read.csv("gdpCleaned.csv",header=True,mode="DROPMALFORMED",schema=schema)
excRate = spark.read.csv("excRate.csv",header=True,mode="DROPMALFORMED",schema=exchange_rate_schema)

gdp = gdp.join(excRate, (gdp.countryCode==excRate.country) & (gdp.year==excRate.year), "left").drop(excRate.year).drop(excRate.country)

gdp.write.csv("gdpCleanedD.csv", header=True)

In [178]:
from pyspark.sql.types import StructField,StructType,DateType,DoubleType,IntegerType,StringType

schema = StructType([
    StructField("country", StringType()),
    StructField("cleanedName", StringType()),
    StructField("countryCode", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("debt", DoubleType()),
    StructField("inflation", DoubleType()),
    StructField("unemployment", DoubleType()),
    StructField("lendingInterest", DoubleType()),
    StructField("techExport", DoubleType()),
    StructField("totalReserves", DoubleType()),
    StructField("fdi", DoubleType()),
    StructField("cropProduction", DoubleType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType()),
    StructField("workingPop", DoubleType()),
    StructField("exchangeRate", DoubleType())
])

oilProd = StructType([
    StructField("country", StringType()),  
    StructField("year", IntegerType()),
    StructField("oilProd", DoubleType())
])

gdp = spark.read.csv("gdpCleaned.csv",header=True,mode="DROPMALFORMED",schema=schema)
oilProd = spark.read.csv("oilProd.csv",header=True,mode="DROPMALFORMED",schema=oilProd)

gdp = gdp.join(oilProd, (gdp.countryCode==oilProd.country) & (gdp.year==oilProd.year), "left").drop(oilProd.year).drop(oilProd.country)

gdp.write.csv("gdpCleanedD.csv", header=True)

In [None]:
from pyspark.sql.types import StructField,StructType,DateType,DoubleType,IntegerType,StringType

schema = StructType([
    StructField("country", StringType()),
    StructField("cleanedName", StringType()),
    StructField("countryCode", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("debt", DoubleType()),
    StructField("inflation", DoubleType()),
    StructField("unemployment", DoubleType()),
    StructField("lendingInterest", DoubleType()),
    StructField("techExport", DoubleType()),
    StructField("totalReserves", DoubleType()),
    StructField("fdi", DoubleType()),
    StructField("cropProduction", DoubleType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType()),
    StructField("workingPop", DoubleType()),
    StructField("exchangeRate", DoubleType()),
    StructField("oilProd", DoubleType())
])

crudeOil = StructType([
    StructField("country", StringType()),  
    StructField("year", IntegerType()),
    StructField("oilImportPrice", DoubleType())
])

gdp = spark.read.csv("gdpCleaned.csv",header=True,mode="DROPMALFORMED",schema=schema)
crudeOil = spark.read.csv("oilImportPrice.csv",header=True,mode="DROPMALFORMED",schema=crudeOil)

gdp = gdp.join(crudeOil, (gdp.countryCode==crudeOil.country) & (gdp.year==crudeOil.year), "left").drop(crudeOil.year).drop(crudeOil.country)

gdp.write.csv("gdpCleanedD.csv", header=True)

In [183]:
from pyspark.sql.types import StructField,StructType,DateType,DoubleType,IntegerType,StringType

schema = StructType([
    StructField("country", StringType()),
    StructField("cleanedName", StringType()),
    StructField("countryCode", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("debt", DoubleType()),
    StructField("inflation", DoubleType()),
    StructField("unemployment", DoubleType()),
    StructField("lendingInterest", DoubleType()),
    StructField("techExport", DoubleType()),
    StructField("totalReserves", DoubleType()),
    StructField("fdi", DoubleType()),
    StructField("cropProduction", DoubleType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType()),
    StructField("workingPop", DoubleType()),
    StructField("exchangeRate", DoubleType()),
    StructField("oilProd", DoubleType()),
    StructField("oilImportPrice", DoubleType())
])

ppiSch = StructType([
    StructField("country", StringType()),  
    StructField("year", IntegerType()),
    StructField("ppp",DoubleType())
])

gdp = spark.read.csv("gdpCleaned.csv",header=True,mode="DROPMALFORMED",schema=schema)
ppp = spark.read.csv("ppp.csv",header=True,mode="DROPMALFORMED",schema=ppiSch)

gdp = gdp.join(ppp, (gdp.countryCode==ppp.country) & (gdp.year==ppp.year), "left").drop(ppp.year).drop(ppp.country)

gdp.write.csv("gdpCleanedD.csv", header=True)

In [186]:
from pyspark.sql.types import StructField,StructType,DateType,DoubleType,IntegerType,StringType

schema = StructType([
    StructField("country", StringType()),
    StructField("cleanedName", StringType()),
    StructField("countryCode", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("debt", DoubleType()),
    StructField("inflation", DoubleType()),
    StructField("unemployment", DoubleType()),
    StructField("lendingInterest", DoubleType()),
    StructField("techExport", DoubleType()),
    StructField("totalReserves", DoubleType()),
    StructField("fdi", DoubleType()),
    StructField("cropProduction", DoubleType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType()),
    StructField("workingPop", DoubleType()),
    StructField("exchangeRate", DoubleType()),
    StructField("oilProd", DoubleType()),
    StructField("oilImportPrice", DoubleType()),
    StructField("ppi",DoubleType())
])

res = StructType([
    StructField("country", StringType()),  
    StructField("year", IntegerType()),
    StructField("researcherPer1k",DoubleType())
])

gdp = spark.read.csv("gdpCleaned.csv",header=True,mode="DROPMALFORMED",schema=schema)
research = spark.read.csv("researcherPer1k.csv",header=True,mode="DROPMALFORMED",schema=res)

gdp = gdp.join(research, (gdp.countryCode==research.country) & (gdp.year==research.year), "left").drop(research.year).drop(research.country)

gdp.write.csv("gdpCleanedD.csv", header=True)

In [204]:
from pyspark.sql.types import StructField,StructType,DateType,DoubleType,IntegerType,StringType

schema = StructType([
    StructField("country", StringType()),
    StructField("cleanedName", StringType()),
    StructField("countryCode", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("debt", DoubleType()),
    StructField("inflation", DoubleType()),
    StructField("unemployment", DoubleType()),
    StructField("lendingInterest", DoubleType()),
    StructField("techExport", DoubleType()),
    StructField("totalReserves", DoubleType()),
    StructField("fdi", DoubleType()),
    StructField("cropProduction", DoubleType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType()),
    StructField("workingPop", DoubleType()),
    StructField("exchangeRate", DoubleType()),
    StructField("oilProd", DoubleType()),
    StructField("oilImportPrice", DoubleType()),
    StructField("ppp",DoubleType()),
    StructField("researcherPer1k",DoubleType())
])

ppi = StructType([
    StructField("country", StringType()), 
    StructField("Type", StringType()), 
    StructField("year", IntegerType()),  
    StructField("ppi", DoubleType())
])

gdp = spark.read.csv("gdpCleaned.csv",header=True,mode="DROPMALFORMED",schema=schema)
ppi = spark.read.csv("ppi.csv",header=True,mode="DROPMALFORMED",schema=ppi)

ppi = ppi.filter(ppi.Type=="AGRWTH").drop(ppi.Type)
ppi = ppi.groupBy("country","year").avg("ppi").alias("ppi")

gdp = gdp.join(ppi, (gdp.countryCode==ppi.country) & (gdp.year==ppi.year), "left").drop(ppi.year).drop(ppi.country)

gdp.write.csv("gdpCleanedD.csv", header=True)

In [212]:
from pyspark.sql.types import StructField,StructType,DateType,DoubleType,IntegerType,StringType

schema = StructType([
    StructField("country", StringType()),
    StructField("cleanedName", StringType()),
    StructField("countryCode", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("debt", DoubleType()),
    StructField("inflation", DoubleType()),
    StructField("unemployment", DoubleType()),
    StructField("lendingInterest", DoubleType()),
    StructField("techExport", DoubleType()),
    StructField("totalReserves", DoubleType()),
    StructField("fdi", DoubleType()),
    StructField("cropProduction", DoubleType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType()),
    StructField("workingPop", DoubleType()),
    StructField("exchangeRate", DoubleType()),
    StructField("oilProd", DoubleType()),
    StructField("oilImportPrice", DoubleType()),
    StructField("ppp",DoubleType()),
    StructField("researcherPer1k",DoubleType()),
    StructField("ppi", DoubleType())
])

indProd = StructType([
    StructField("country", StringType()), 
    StructField("Type", StringType()), 
    StructField("year", IntegerType()),  
    StructField("totalIndProd", DoubleType())
])


gdp = spark.read.csv("gdpCleaned.csv",header=True,mode="DROPMALFORMED",schema=schema)
indProd = spark.read.csv("indProd.csv",header=True,mode="DROPMALFORMED",schema=indProd)
indProd = indProd.filter(indProd.Type=="TOT")#.drop("Type")

gdp = gdp.join(indProd, (gdp.countryCode==indProd.country) & (gdp.year==indProd.year), "left").drop(indProd.year).drop(indProd.country)

gdp.write.csv("gdpCleanedD.csv", header=True)

In [221]:
schema = StructType([
    StructField("country", StringType()),
    StructField("cleanedName", StringType()),
    StructField("countryCode", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("debt", DoubleType()),
    StructField("inflation", DoubleType()),
    StructField("unemployment", DoubleType()),
    StructField("lendingInterest", DoubleType()),
    StructField("techExport", DoubleType()),
    StructField("totalReserves", DoubleType()),
    StructField("fdi", DoubleType()),
    StructField("cropProduction", DoubleType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType()),
    StructField("workingPop", DoubleType()),
    StructField("exchangeRate", DoubleType()),
    StructField("oilProd", DoubleType()),
    StructField("oilImportPrice", DoubleType()),
    StructField("ppp",DoubleType()),
    StructField("researcherPer1k",DoubleType()),
    StructField("ppi", DoubleType()),
    StructField("totalIndProd", DoubleType())
])

sharePrice = StructType([
    StructField("country", StringType()), 
    StructField("Type", StringType()), 
    StructField("year", IntegerType()),  
    StructField("sharePrice", DoubleType())
])

gdp = spark.read.csv("gdpCleaned.csv",header=True,mode="DROPMALFORMED",schema=schema)
sharePrice = spark.read.csv("sharePrice.csv",header=True,mode="DROPMALFORMED",schema=sharePrice)
sharePrice = sharePrice.filter(sharePrice.Type=="A").drop("Type")

gdp = gdp.join(sharePrice,(gdp.countryCode==sharePrice.country)&(gdp.year==sharePrice.year),"left").drop(sharePrice.country).drop(sharePrice.year)

gdp.coalesce(1).write.csv("gdpCleaneD",header=True)

In [230]:
schema = StructType([
    StructField("country", StringType()),
    StructField("cleanedName", StringType()),
    StructField("countryCode", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("debt", DoubleType()),
    StructField("inflation", DoubleType()),
    StructField("unemployment", DoubleType()),
    StructField("lendingInterest", DoubleType()),
    StructField("techExport", DoubleType()),
    StructField("totalReserves", DoubleType()),
    StructField("fdi", DoubleType()),
    StructField("cropProduction", DoubleType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType()),
    StructField("workingPop", DoubleType()),
    StructField("exchangeRate", DoubleType()),
    StructField("oilProd", DoubleType()),
    StructField("oilImportPrice", DoubleType()),
    StructField("ppp",DoubleType()),
    StructField("researcherPer1k",DoubleType()),
    StructField("ppi", DoubleType()),
    StructField("totalIndProd", DoubleType()),
    StructField("sharePrice",DoubleType())
])

pollution_schema = StructType([
    StructField("country", StringType()),
    StructField("year", IntegerType()),
    StructField("pollution", DoubleType())
])

gdp = spark.read.csv("gdpCleaned.csv",header=True,mode="DROPMALFORMED",schema=schema)
pollution = spark.read.csv("pollution.csv",header="True",mode="DROPMALFORMED",schema=pollution_schema)

pollution=pollution.groupBy("country","year").avg("pollution").alias("pollution")


gdp = gdp.join(pollution,(gdp.countryCode==pollution.country)&(gdp.year==pollution.year),"left").drop(pollution.country).drop(pollution.year)

gdp = gdp.withColumnRenamed("avg(pollution)","pollution")

gdp.coalesce(1).write.csv("gdpCleanedData",header="True")

In [237]:
schema = StructType([
    StructField("country", StringType()),
    StructField("cleanedName", StringType()),
    StructField("countryCode", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("debt", DoubleType()),
    StructField("inflation", DoubleType()),
    StructField("unemployment", DoubleType()),
    StructField("lendingInterest", DoubleType()),
    StructField("techExport", DoubleType()),
    StructField("totalReserves", DoubleType()),
    StructField("fdi", DoubleType()),
    StructField("cropProduction", DoubleType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType()),
    StructField("workingPop", DoubleType()),
    StructField("exchangeRate", DoubleType()),
    StructField("oilProd", DoubleType()),
    StructField("oilImportPrice", DoubleType()),
    StructField("ppp",DoubleType()),
    StructField("researcherPer1k",DoubleType()),
    StructField("ppi", DoubleType()),
    StructField("totalIndProd", DoubleType()),
    StructField("sharePrice",DoubleType()),
    StructField("pollution", DoubleType())
])

narrow_money = StructType([
    StructField("country",StringType()),
    StructField("Type",StringType()),
    StructField("year",IntegerType()),
    StructField("narrowMoney",DoubleType())
])

gdp = spark.read.csv("gdpCleaned.csv",mode="DROPMALFORMED",header=True,schema=schema)
narrow = spark.read.csv("narrowMoney.csv",mode="DROPMALFORMED",header=True,schema=narrow_money)
narrow = narrow.filter(narrow.Type=="A").drop(narrow.Type)

gdp = gdp.join(narrow,(gdp.countryCode==narrow.country)&(gdp.year==narrow.year),"left").drop(narrow.country).drop(narrow.year)

gdp.coalesce(1).write.csv("gdpCleanedData",header=True)

In [241]:
schema = StructType([
    StructField("country", StringType()),
    StructField("cleanedName", StringType()),
    StructField("countryCode", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("debt", DoubleType()),
    StructField("inflation", DoubleType()),
    StructField("unemployment", DoubleType()),
    StructField("lendingInterest", DoubleType()),
    StructField("techExport", DoubleType()),
    StructField("totalReserves", DoubleType()),
    StructField("fdi", DoubleType()),
    StructField("cropProduction", DoubleType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType()),
    StructField("workingPop", DoubleType()),
    StructField("exchangeRate", DoubleType()),
    StructField("oilProd", DoubleType()),
    StructField("oilImportPrice", DoubleType()),
    StructField("ppp",DoubleType()),
    StructField("researcherPer1k",DoubleType()),
    StructField("ppi", DoubleType()),
    StructField("totalIndProd", DoubleType()),
    StructField("sharePrice",DoubleType()),
    StructField("pollution", DoubleType()),
    StructField("narrowMoneyM1",DoubleType())
])

broad_money = StructType([
    StructField("country",StringType()),
    StructField("Type",StringType()),
    StructField("year",IntegerType()),
    StructField("broadMoney",DoubleType())
])

gdp = spark.read.csv("gdpCleaned.csv",mode="DROPMALFORMED",header=True,schema=schema)
broad = spark.read.csv("broadMoney.csv",mode="DROPMALFORMED",header=True,schema=broad_money)
broad = broad.filter(broad.Type=="A").drop(broad.Type)

print broad.count()
print broad.select("country","year").distinct().count()

gdp = gdp.join(broad,(gdp.countryCode==broad.country)&(gdp.year==broad.year),"left").drop(broad.country).drop(broad.year)
gdp.coalesce(1).write.csv("gdpCleanedData",header=True)

1174
1174


In [247]:
schema = StructType([
    StructField("country", StringType()),
    StructField("cleanedName", StringType()),
    StructField("countryCode", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("debt", DoubleType()),
    StructField("inflation", DoubleType()),
    StructField("unemployment", DoubleType()),
    StructField("lendingInterest", DoubleType()),
    StructField("techExport", DoubleType()),
    StructField("totalReserves", DoubleType()),
    StructField("fdi", DoubleType()),
    StructField("cropProduction", DoubleType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType()),
    StructField("workingPop", DoubleType()),
    StructField("exchangeRate", DoubleType()),
    StructField("oilProd", DoubleType()),
    StructField("oilImportPrice", DoubleType()),
    StructField("ppp",DoubleType()),
    StructField("researcherPer1k",DoubleType()),
    StructField("ppi", DoubleType()),
    StructField("totalIndProd", DoubleType()),
    StructField("sharePrice",DoubleType()),
    StructField("pollution", DoubleType()),
    StructField("narrowMoneyM1",DoubleType()),
    StructField("broadMoney",DoubleType()) 
])

ints = StructType([
    StructField("country",StringType()),
    StructField("types",StringType()),
    StructField("year",IntegerType()),
    StructField("longTermInterest",DoubleType())
])

gdp = spark.read.csv("gdpCleaned.csv",mode="DROPMALFORMED",header=True,schema=schema)
longterm = spark.read.csv("longTermInterest.csv",mode="DROPMALFORMED",header=True,schema=ints)
longterm = longterm.filter(longterm.types=="A").drop(longterm.types)

gdp = gdp.join(longterm,(gdp.countryCode==longterm.country)&(gdp.year==longterm.year),"left").drop(longterm.country).drop(longterm.year)

gdp.coalesce(1).write.csv("gdpCleanedData",header=True)

In [251]:
schema = StructType([
    StructField("country", StringType()),
    StructField("cleanedName", StringType()),
    StructField("countryCode", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("debt", DoubleType()),
    StructField("inflation", DoubleType()),
    StructField("unemployment", DoubleType()),
    StructField("lendingInterest", DoubleType()),
    StructField("techExport", DoubleType()),
    StructField("totalReserves", DoubleType()),
    StructField("fdi", DoubleType()),
    StructField("cropProduction", DoubleType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType()),
    StructField("workingPop", DoubleType()),
    StructField("exchangeRate", DoubleType()),
    StructField("oilProd", DoubleType()),
    StructField("oilImportPrice", DoubleType()),
    StructField("ppp",DoubleType()),
    StructField("researcherPer1k",DoubleType()),
    StructField("ppi", DoubleType()),
    StructField("totalIndProd", DoubleType()),
    StructField("sharePrice",DoubleType()),
    StructField("pollution", DoubleType()),
    StructField("narrowMoneyM1",DoubleType()),
    StructField("broadMoney",DoubleType()),
    StructField("longTermInterest",DoubleType())
])

ints = StructType([
    StructField("country",StringType()),
    StructField("types",StringType()),
    StructField("year",IntegerType()),
    StructField("shortTermInterest",DoubleType())
])

gdp = spark.read.csv("gdpCleaned.csv",mode="DROPMALFORMED",header=True,schema=schema)
shortTerm = spark.read.csv("shortTermIntereset.csv",mode="DROPMALFORMED",header=True,schema=ints)
shortTerm = shortTerm.filter(shortTerm.types=="A").drop(shortTerm.types)

gdp = gdp.join(shortTerm,(gdp.countryCode==shortTerm.country)&(gdp.year==shortTerm.year),"left").drop(shortTerm.country).drop(shortTerm.year)

gdp.coalesce(1).write.csv("gdpCleanedData",header=True)

In [279]:
schema = StructType([
    StructField("country", StringType()),
    StructField("cleanedName", StringType()),
    StructField("countryCode", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("debt", DoubleType()),
    StructField("inflation", DoubleType()),
    StructField("unemployment", DoubleType()),
    StructField("lendingInterest", DoubleType()),
    StructField("techExport", DoubleType()),
    StructField("totalReserves", DoubleType()),
    StructField("fdi", DoubleType()),
    StructField("cropProduction", DoubleType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType()),
    StructField("workingPop", DoubleType()),
    StructField("exchangeRate", DoubleType()),
    StructField("oilProd", DoubleType()),
    StructField("oilImportPrice", DoubleType()),
    StructField("ppp",DoubleType()),
    StructField("researcherPer1k",DoubleType()),
    StructField("ppi", DoubleType()),
    StructField("totalIndProd", DoubleType()),
    StructField("sharePrice",DoubleType()),
    StructField("pollution", DoubleType()),
    StructField("narrowMoneyM1",DoubleType()),
    StructField("broadMoney",DoubleType()),
    StructField("longTermInterest",DoubleType()),
    StructField("shortTermInterest",DoubleType())
])

gdp = spark.read.csv("gdpCleaned.csv",mode="DROPMALFORMED",header=True,schema=schema)
trade = (sc.textFile("importToExportRatio.csv")
           .map(lambda text:text.split(';')))

header= trade.first()
trade = trade.filter(lambda line:line!=header).toDF(header)
trade = (trade.withColumnRenamed("Country or Area","country")
              .withColumnRenamed("Value","importExportRatio"))

gdp = gdp.join(trade,(gdp.cleanedName==trade.country)&(gdp.year==trade.Year),"left").drop(trade.country).drop(trade.Year)

gdp.coalesce(1).write.csv("gdpCleanedData",header=True)

In [286]:
schema = StructType([
    StructField("country", StringType()),
    StructField("cleanedName", StringType()),
    StructField("countryCode", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("debt", DoubleType()),
    StructField("inflation", DoubleType()),
    StructField("unemployment", DoubleType()),
    StructField("lendingInterest", DoubleType()),
    StructField("techExport", DoubleType()),
    StructField("totalReserves", DoubleType()),
    StructField("fdi", DoubleType()),
    StructField("cropProduction", DoubleType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType()),
    StructField("workingPop", DoubleType()),
    StructField("exchangeRate", DoubleType()),
    StructField("oilProd", DoubleType()),
    StructField("oilImportPrice", DoubleType()),
    StructField("ppp",DoubleType()),
    StructField("researcherPer1k",DoubleType()),
    StructField("ppi", DoubleType()),
    StructField("totalIndProd", DoubleType()),
    StructField("sharePrice",DoubleType()),
    StructField("pollution", DoubleType()),
    StructField("narrowMoneyM1",DoubleType()),
    StructField("broadMoney",DoubleType()),
    StructField("longTermInterest",DoubleType()),
    StructField("shortTermInterest",DoubleType()),
    StructField("importExportRatio",DoubleType())
])

gdp = spark.read.csv("gdpCleaned.csv",mode="DROPMALFORMED",header=True,schema=schema)

newBusiness = (sc.textFile("newBusiness.csv")
           .map(lambda text:text.split(';')))

header= newBusiness.first()
newBusiness = newBusiness.filter(lambda line:line!=header).toDF(header)
newBusiness = (newBusiness.withColumnRenamed("Country or Area","country")
                          .withColumnRenamed("Value","newBusinessDiff"))

gdp = gdp.join(newBusiness,(gdp.cleanedName==newBusiness.country)&(gdp.year==newBusiness.Year),"left").drop(newBusiness.country).drop(newBusiness.Year)
gdp.coalesce(1).write.csv("gdpCleanedData",header=True)

In [289]:
schema = StructType([
    StructField("country", StringType()),
    StructField("cleanedName", StringType()),
    StructField("countryCode", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("debt", DoubleType()),
    StructField("inflation", DoubleType()),
    StructField("unemployment", DoubleType()),
    StructField("lendingInterest", DoubleType()),
    StructField("techExport", DoubleType()),
    StructField("totalReserves", DoubleType()),
    StructField("fdi", DoubleType()),
    StructField("cropProduction", DoubleType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType()),
    StructField("workingPop", DoubleType()),
    StructField("exchangeRate", DoubleType()),
    StructField("oilProd", DoubleType()),
    StructField("oilImportPrice", DoubleType()),
    StructField("ppp",DoubleType()),
    StructField("researcherPer1k",DoubleType()),
    StructField("ppi", DoubleType()),
    StructField("totalIndProd", DoubleType()),
    StructField("sharePrice",DoubleType()),
    StructField("pollution", DoubleType()),
    StructField("narrowMoneyM1",DoubleType()),
    StructField("broadMoney",DoubleType()),
    StructField("longTermInterest",DoubleType()),
    StructField("shortTermInterest",DoubleType()),
    StructField("importExportRatio",DoubleType()),
    StructField("newBusinessDiff",DoubleType())
])

gdp = spark.read.csv("gdpCleaned.csv",mode="DROPMALFORMED",header=True,schema=schema)

naturalGasImport = (sc.textFile("naturalGasImport.csv")
           .map(lambda text:text.split(';')))

header= naturalGasImport.first()
naturalGasImport = naturalGasImport.filter(lambda line:line!=header).toDF(header)
naturalGasImport = (naturalGasImport.withColumnRenamed("Country or Area","country")
                                    .withColumnRenamed("Quantity","naturalGasImport"))

gdp = gdp.join(naturalGasImport,(gdp.cleanedName==naturalGasImport.country)&(gdp.year==naturalGasImport.Year),"left").drop(naturalGasImport.country).drop(naturalGasImport.Year)

gdp.coalesce(1).write.csv("gdpCleanedData",header=True)

In [292]:
schema = StructType([
    StructField("country", StringType()),
    StructField("cleanedName", StringType()),
    StructField("countryCode", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("debt", DoubleType()),
    StructField("inflation", DoubleType()),
    StructField("unemployment", DoubleType()),
    StructField("lendingInterest", DoubleType()),
    StructField("techExport", DoubleType()),
    StructField("totalReserves", DoubleType()),
    StructField("fdi", DoubleType()),
    StructField("cropProduction", DoubleType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType()),
    StructField("workingPop", DoubleType()),
    StructField("exchangeRate", DoubleType()),
    StructField("oilProd", DoubleType()),
    StructField("oilImportPrice", DoubleType()),
    StructField("ppp",DoubleType()),
    StructField("researcherPer1k",DoubleType()),
    StructField("ppi", DoubleType()),
    StructField("totalIndProd", DoubleType()),
    StructField("sharePrice",DoubleType()),
    StructField("pollution", DoubleType()),
    StructField("narrowMoneyM1",DoubleType()),
    StructField("broadMoney",DoubleType()),
    StructField("longTermInterest",DoubleType()),
    StructField("shortTermInterest",DoubleType()),
    StructField("importExportRatio",DoubleType()),
    StructField("newBusinessDiff",DoubleType()),
    StructField("naturalGasImport",DoubleType())
])

gdp = spark.read.csv("gdpCleaned.csv",mode="DROPMALFORMED",header=True,schema=schema)

naturalGasExport = (sc.textFile("naturalGasExport.csv")
           .map(lambda text:text.split(';')))

header= naturalGasExport.first()
naturalGasExport = naturalGasExport.filter(lambda line:line!=header).toDF(header)
naturalGasExport = (naturalGasExport.withColumnRenamed("Country or Area","country")
                                    .withColumnRenamed("Quantity","naturalGasExport"))

gdp = gdp.join(naturalGasExport,(gdp.cleanedName==naturalGasExport.country)&(gdp.year==naturalGasExport.Year),"left").drop(naturalGasExport.country).drop(naturalGasExport.Year)

gdp.coalesce(1).write.csv("gdpCleanedData",header=True)

In [300]:
schema = StructType([
    StructField("country", StringType()),
    StructField("cleanedName", StringType()),
    StructField("countryCode", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("debt", DoubleType()),
    StructField("inflation", DoubleType()),
    StructField("unemployment", DoubleType()),
    StructField("lendingInterest", DoubleType()),
    StructField("techExport", DoubleType()),
    StructField("totalReserves", DoubleType()),
    StructField("fdi", DoubleType()),
    StructField("cropProduction", DoubleType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType()),
    StructField("workingPop", DoubleType()),
    StructField("exchangeRate", DoubleType()),
    StructField("oilProd", DoubleType()),
    StructField("oilImportPrice", DoubleType()),
    StructField("ppp",DoubleType()),
    StructField("researcherPer1k",DoubleType()),
    StructField("ppi", DoubleType()),
    StructField("totalIndProd", DoubleType()),
    StructField("sharePrice",DoubleType()),
    StructField("pollution", DoubleType()),
    StructField("narrowMoneyM1",DoubleType()),
    StructField("broadMoney",DoubleType()),
    StructField("longTermInterest",DoubleType()),
    StructField("shortTermInterest",DoubleType()),
    StructField("importExportRatio",DoubleType()),
    StructField("newBusinessDiff",DoubleType()),
    StructField("naturalGasImport",DoubleType()),
    StructField("naturalGasExport",DoubleType())
])

externalDebtStock = (sc.textFile("externalDebtStock.csv")
           .map(lambda text:text.split(';')))

header= externalDebtStock.first()
externalDebtStock = externalDebtStock.filter(lambda line:line!=header).toDF(header)
externalDebtStock = (externalDebtStock.withColumnRenamed("Country or Area","country")
                                      .withColumnRenamed("Value","externalDebtStock"))

gdp = spark.read.csv("gdpCleaned.csv",mode="DROPMALFORMED",header=True,schema=schema)

gdp = gdp.join(externalDebtStock,(gdp.cleanedName==externalDebtStock.country)&(gdp.year==externalDebtStock.Year),"left").drop(externalDebtStock.country).drop(externalDebtStock.Year)

gdp.coalesce(1).write.csv("gdpCleanedData",header=True)

In [305]:
schema = StructType([
    StructField("country", StringType()),
    StructField("cleanedName", StringType()),
    StructField("countryCode", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("debt", DoubleType()),
    StructField("inflation", DoubleType()),
    StructField("unemployment", DoubleType()),
    StructField("lendingInterest", DoubleType()),
    StructField("techExport", DoubleType()),
    StructField("totalReserves", DoubleType()),
    StructField("fdi", DoubleType()),
    StructField("cropProduction", DoubleType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType()),
    StructField("workingPop", DoubleType()),
    StructField("exchangeRate", DoubleType()),
    StructField("oilProd", DoubleType()),
    StructField("oilImportPrice", DoubleType()),
    StructField("ppp",DoubleType()),
    StructField("researcherPer1k",DoubleType()),
    StructField("ppi", DoubleType()),
    StructField("totalIndProd", DoubleType()),
    StructField("sharePrice",DoubleType()),
    StructField("pollution", DoubleType()),
    StructField("narrowMoneyM1",DoubleType()),
    StructField("broadMoney",DoubleType()),
    StructField("longTermInterest",DoubleType()),
    StructField("shortTermInterest",DoubleType()),
    StructField("importExportRatio",DoubleType()),
    StructField("newBusinessDiff",DoubleType()),
    StructField("naturalGasImport",DoubleType()),
    StructField("naturalGasExport",DoubleType()),
    StructField("externalDebtStock",DoubleType())
])

gdp = spark.read.csv("gdpCleaned.csv",mode="DROPMALFORMED",header=True,schema=schema)

oilImport = (sc.textFile("oilImport.csv")
               .map(lambda text:text.split(';')))

header= oilImport.first()
oilImport = oilImport.filter(lambda line:line!=header).toDF(header)
oilImport = (oilImport.withColumnRenamed("Country or Area","country")
                      .withColumnRenamed("Quantity","oilImport"))

gdp = gdp.join(oilImport,(gdp.cleanedName==oilImport.country)&(gdp.year==oilImport.Year),"left").drop(oilImport.country).drop(oilImport.Year)

gdp.coalesce(1).write.csv("gdpCleanedData",header=True)

In [307]:
schema = StructType([
    StructField("country", StringType()),
    StructField("cleanedName", StringType()),
    StructField("countryCode", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("debt", DoubleType()),
    StructField("inflation", DoubleType()),
    StructField("unemployment", DoubleType()),
    StructField("lendingInterest", DoubleType()),
    StructField("techExport", DoubleType()),
    StructField("totalReserves", DoubleType()),
    StructField("fdi", DoubleType()),
    StructField("cropProduction", DoubleType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType()),
    StructField("workingPop", DoubleType()),
    StructField("exchangeRate", DoubleType()),
    StructField("oilProd", DoubleType()),
    StructField("oilImportPrice", DoubleType()),
    StructField("ppp",DoubleType()),
    StructField("researcherPer1k",DoubleType()),
    StructField("ppi", DoubleType()),
    StructField("totalIndProd", DoubleType()),
    StructField("sharePrice",DoubleType()),
    StructField("pollution", DoubleType()),
    StructField("narrowMoneyM1",DoubleType()),
    StructField("broadMoney",DoubleType()),
    StructField("longTermInterest",DoubleType()),
    StructField("shortTermInterest",DoubleType()),
    StructField("importExportRatio",DoubleType()),
    StructField("newBusinessDiff",DoubleType()),
    StructField("naturalGasImport",DoubleType()),
    StructField("naturalGasExport",DoubleType()),
    StructField("externalDebtStock",DoubleType()),
    StructField("oilImport",DoubleType())
])

gdp = spark.read.csv("gdpCleaned.csv",mode="DROPMALFORMED",header=True,schema=schema)

oilExport = (sc.textFile("oilExport.csv")
               .map(lambda text:text.split(';')))

header= oilExport.first()
oilExport = oilExport.filter(lambda line:line!=header).toDF(header)
oilExport = (oilExport.withColumnRenamed("Country or Area","country")
                      .withColumnRenamed("Quantity","oilExport"))

gdp = gdp.join(oilExport,(gdp.cleanedName==oilExport.country)&(gdp.year==oilExport.Year),"left").drop(oilExport.country).drop(oilExport.Year)

gdp.coalesce(1).write.csv("gdpCleanedData",header=True)

In [310]:
schema = StructType([
    StructField("country", StringType()),
    StructField("cleanedName", StringType()),
    StructField("countryCode", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("debt", DoubleType()),
    StructField("inflation", DoubleType()),
    StructField("unemployment", DoubleType()),
    StructField("lendingInterest", DoubleType()),
    StructField("techExport", DoubleType()),
    StructField("totalReserves", DoubleType()),
    StructField("fdi", DoubleType()),
    StructField("cropProduction", DoubleType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType()),
    StructField("workingPop", DoubleType()),
    StructField("exchangeRate", DoubleType()),
    StructField("oilProd", DoubleType()),
    StructField("oilImportPrice", DoubleType()),
    StructField("ppp",DoubleType()),
    StructField("researcherPer1k",DoubleType()),
    StructField("ppi", DoubleType()),
    StructField("totalIndProd", DoubleType()),
    StructField("sharePrice",DoubleType()),
    StructField("pollution", DoubleType()),
    StructField("narrowMoneyM1",DoubleType()),
    StructField("broadMoney",DoubleType()),
    StructField("longTermInterest",DoubleType()),
    StructField("shortTermInterest",DoubleType()),
    StructField("importExportRatio",DoubleType()),
    StructField("newBusinessDiff",DoubleType()),
    StructField("naturalGasImport",DoubleType()),
    StructField("naturalGasExport",DoubleType()),
    StructField("externalDebtStock",DoubleType()),
    StructField("oilImport",DoubleType()),
    StructField("oilExport",DoubleType())
])

gdp = spark.read.csv("gdpCleaned.csv",mode="DROPMALFORMED",header=True,schema=schema)

tradeImport = (sc.textFile("tradeImport.csv")
                 .map(lambda text:text.split(';')))

header= tradeImport.first()
tradeImport = tradeImport.filter(lambda line:line!=header).toDF(header)
tradeImport = (tradeImport.withColumnRenamed("Country or Area","country")
                          .withColumnRenamed("Quantity","tradeImport"))

gdp = gdp.join(tradeImport,(gdp.cleanedName==tradeImport.country)&(gdp.year==tradeImport.year),"left").drop(tradeImport.country).drop(tradeImport.year)

gdp.coalesce(1).write.csv("gdpCleanedData",header=True)

In [313]:
schema = StructType([
    StructField("country", StringType()),
    StructField("cleanedName", StringType()),
    StructField("countryCode", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("debt", DoubleType()),
    StructField("inflation", DoubleType()),
    StructField("unemployment", DoubleType()),
    StructField("lendingInterest", DoubleType()),
    StructField("techExport", DoubleType()),
    StructField("totalReserves", DoubleType()),
    StructField("fdi", DoubleType()),
    StructField("cropProduction", DoubleType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType()),
    StructField("workingPop", DoubleType()),
    StructField("exchangeRate", DoubleType()),
    StructField("oilProd", DoubleType()),
    StructField("oilImportPrice", DoubleType()),
    StructField("ppp",DoubleType()),
    StructField("researcherPer1k",DoubleType()),
    StructField("ppi", DoubleType()),
    StructField("totalIndProd", DoubleType()),
    StructField("sharePrice",DoubleType()),
    StructField("pollution", DoubleType()),
    StructField("narrowMoneyM1",DoubleType()),
    StructField("broadMoney",DoubleType()),
    StructField("longTermInterest",DoubleType()),
    StructField("shortTermInterest",DoubleType()),
    StructField("importExportRatio",DoubleType()),
    StructField("newBusinessDiff",DoubleType()),
    StructField("naturalGasImport",DoubleType()),
    StructField("naturalGasExport",DoubleType()),
    StructField("externalDebtStock",DoubleType()),
    StructField("oilImport",DoubleType()),
    StructField("oilExport",DoubleType()),
    StructField("tradeImport",DoubleType()) 
])

gdp = spark.read.csv("gdpCleaned.csv",mode="DROPMALFORMED",header=True,schema=schema)

tradeExport = (sc.textFile("tradeExport.csv")
                 .map(lambda text:text.split(';')))

header= tradeExport.first()
tradeExport = tradeExport.filter(lambda line:line!=header).toDF(header)
tradeExport = (tradeExport.withColumnRenamed("Country or Area","country")
                          .withColumnRenamed("Quantity","tradeExport"))

gdp = gdp.join(tradeExport,(gdp.cleanedName==tradeExport.country)&(gdp.year==tradeExport.year),"left").drop(tradeExport.country).drop(tradeExport.year)
gdp.coalesce(1).write.csv("gdpCleanedData",header=True)

In [318]:
schema = StructType([
    StructField("country", StringType()),
    StructField("countryName", StringType()),
    StructField("countryCode", StringType()),
    StructField("year", IntegerType()),
    StructField("gdp", DoubleType()),
    StructField("gdpNote", IntegerType()),
    StructField("debt", DoubleType()),
    StructField("inflation", DoubleType()),
    StructField("unemployment", DoubleType()),
    StructField("lendingInterest", DoubleType()),
    StructField("techExport", DoubleType()),
    StructField("totalReserves", DoubleType()),
    StructField("fdi", DoubleType()),
    StructField("cropProduction", DoubleType()),
    StructField("oilPrice", DoubleType()),
    StructField("gasPrice", DoubleType()),
    StructField("workingPop", DoubleType()),
    StructField("exchangeRate", DoubleType()),
    StructField("oilProd", DoubleType()),
    StructField("oilImportPrice", DoubleType()),
    StructField("ppp",DoubleType()),
    StructField("researcherPer1k",DoubleType()),
    StructField("ppi", DoubleType()),
    StructField("totalIndProd", DoubleType()),
    StructField("sharePrice",DoubleType()),
    StructField("pollution", DoubleType()),
    StructField("narrowMoneyM1",DoubleType()),
    StructField("broadMoney",DoubleType()),
    StructField("longTermInterest",DoubleType()),
    StructField("shortTermInterest",DoubleType()),
    StructField("importExportRatio",DoubleType()),
    StructField("newBusinessDiff",DoubleType()),
    StructField("naturalGasImport",DoubleType()),
    StructField("naturalGasExport",DoubleType()),
    StructField("externalDebtStock",DoubleType()),
    StructField("oilImport",DoubleType()),
    StructField("oilExport",DoubleType()),
    StructField("tradeImport",DoubleType()),
    StructField("tradeExport",DoubleType())
])

gdp = spark.read.csv("gdpCleaned.csv",mode="DROPMALFORMED",header=True,schema=schema)
gdp = gdp.orderBy("country","countryName","countryCode","year")

gdp.coalesce(1).write.csv("gdpCleanedData",header=True)

In [3]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

text = sc.textFile('filled.csv')
text = text.map(lambda text:text.split(";"))

# countries = [u'Sweden', u'Singapore', u'Germany', u'France', u'Greece', u'Belgium', u'Finland', u'United States', u'India', u'China', u'Italy', u'Norway', u'Spain', u'Denmark', u'Russian Federation', u'Ireland', u'Iceland', u'Israel', u'Mexico', u'Saudi Arabia', u'Switzerland', u'Canada', u'Brazil', u'Japan', u'New Zealand', u'Portugal', u'Australia', u'South Africa', u'United Kingdom', u'Netherlands']
countries = ['Canada','India','United States','China','France']
header = text.first()
features = text.filter(lambda line:line!=header)

for country in countries:
    data = features.filter(lambda text:text[-1]==country)
    data = data.filter(lambda text:text[-3]>1984)
    test = data.filter(lambda text:text[-3] in ['2014','2015'])
    train = data.filter(lambda text:text[-3] not in ['2014','2015'])
    train = train.map(lambda text:(float(text[-2]),Vectors.dense(text[0:3]))).toDF(['label','features'])
    test = test.map(lambda text:(float(text[-2]),Vectors.dense(text[0:3]))).toDF(['label','features'])
    lr = LinearRegression(maxIter=3)
    pipeline = Pipeline(stages=[lr])
    paramGrid = (ParamGridBuilder().addGrid(lr.regParam, [1, 0.75 ,0.5, 0.1, 0.01])
                                   .addGrid(lr.elasticNetParam, [0.0,0.5, 1.0])
                                   .build())
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=RegressionEvaluator(),
                              numFolds=3)
    cvModel = crossval.fit(train)
    prediction = cvModel.transform(test)
    prediction.show()

+-----------+--------------------+------------------+
|      label|            features|        prediction|
+-----------+--------------------+------------------+
|2.472892638|[68.2722,1.237576...|1.7333235964125357|
|1.078268751|[67.8922,1.253367...|1.7216043513520276|
+-----------+--------------------+------------------+

+-----------+--------------------+-----------------+
|      label|            features|       prediction|
+-----------+--------------------+-----------------+
|7.243471746|[38285.056,141.87...| 8.08099797760533|
|7.570130368|[37762.815,141.87...|8.106384150562933|
+-----------+--------------------+-----------------+

+-----------+--------------------+------------------+
|      label|            features|        prediction|
+-----------+--------------------+------------------+
|2.427795636|[5.0,44.221248625...|1.9496426632383344|
|2.425970526|[5.0,44.618038175...|0.9735467793007544|
+-----------+--------------------+------------------+

+-----------+------------------

Normalized using L^1 norm
+---+--------------+------------------+
| id|      features|      normFeatures|
+---+--------------+------------------+
|  0|[1.0,0.5,-1.0]|    [0.4,0.2,-0.4]|
|  1| [2.0,1.0,1.0]|   [0.5,0.25,0.25]|
|  2|[4.0,10.0,2.0]|[0.25,0.625,0.125]|
+---+--------------+------------------+

Normalized using L^inf norm
+---+--------------+--------------+
| id|      features|  normFeatures|
+---+--------------+--------------+
|  0|[1.0,0.5,-1.0]|[1.0,0.5,-1.0]|
|  1| [2.0,1.0,1.0]| [1.0,0.5,0.5]|
|  2|[4.0,10.0,2.0]| [0.4,1.0,0.2]|
+---+--------------+--------------+

