# Preprocessing of TED CAN (contract award notices) CSV files

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
from pyspark.sql import HiveContext
import json
import time
import sys
from datetime import datetime
from pyspark.sql.types import *
import re

nPartitions=4
conf = (SparkConf()
         .setMaster("local["+str(nPartitions)+"]")
       )
#sc.stop()
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
sqlContext.sql("SET spark.sql.parquet.binaryAsString=true")


## Load country code file
Used to match country ISO codes to country full names

In [None]:
countryCodesRaw = [line.rstrip() for line in open('code/data/countrycodes.csv')]

In [None]:
countryCodes = {}
for i in range(len(countryCodesRaw)):
    code_country=countryCodesRaw[i].split(' - ')
    countryCodes[code_country[0]] = code_country[1]
countryCodes['']=''

## Load CPV meaning file

In [None]:
CPVmeaningRaw = [line.rstrip() for line in open('code/data/CPVmeaning.csv')]

In [None]:
CPVmeanings= {}
for i in range(len(CPVmeaningRaw)):
    CPV_meaning =CPVmeaningRaw[i].split(';')
    CPVmeanings[CPV_meaning[0][0:8]] = CPV_meaning[1]
CPVmeanings['']=''

In [None]:
CPVcorrespondanceRaw = [line.rstrip() for line in open('code/data/CPVcorrespondance2003_2007.csv')]

In [None]:
CPVcorrespondance= {}
for i in range(len(CPVcorrespondanceRaw)):
    CPV_correspondance =CPVcorrespondanceRaw[i].split(';')
    CPVcorrespondance[CPV_correspondance[0][0:8]] = CPV_correspondance[1][0:8]
CPVcorrespondance[None]=''
CPVcorrespondance['']=''

## CSV row processor and schema
Convert CSV rows to nicer format and schema

In [None]:
def process(row):    
    try:
        dispatch_date=None if (row['DT_DISPATCH'] is None) else datetime.strptime(row['DT_DISPATCH'], '%d-%b-%y').strftime('%Y-%m-%d')
        award_notice_id=None if (row['ID_NOTICE_CAN'] is None) else row['ID_NOTICE_CAN'].strip()
        award_notice_id=award_notice_id[4:]+'-'+award_notice_id[0:4]
        award_notice_id_link="<a href='http://ted.europa.eu/udl?uri=TED:NOTICE:"+award_notice_id+":TEXT:EN:HTML' target='_blank'>"+award_notice_id+"</a>"

        contracting_authority_country=None if (row['ISO_COUNTRY_CODE'] is None) else countryCodes[row['ISO_COUNTRY_CODE'].strip()]
        contracting_authority_name=None if (row['CAE_NAME'] is None) else row['CAE_NAME'].replace ("_", "-").strip()

        contractor_country=None if (row['WIN_COUNTRY_CODE'] is None) else countryCodes[row['WIN_COUNTRY_CODE'].strip()]
        contractor_name=None if (row['WIN_NAME'] is None) else row['WIN_NAME'].replace ("_", "-").strip()

        contract_value_euros=None if row['AWARD_VALUE_EURO']=='' else int(float(row['AWARD_VALUE_EURO']))
        number_offers_received=None if row['NUMBER_OFFERS']=='' else int(row['NUMBER_OFFERS'])

        CPV_code=None if (row['CPV'] is None) else row['CPV'].strip()
        CPV_code_2008=CPV_code
        #if CPV_code in CPVcorrespondance:
        #    CPV_code_2008=CPVcorrespondance[CPV_code]
        CPV_code_meaning=CPVmeanings[CPV_code_2008]

        YEAR=None if row['YEAR']=='' else int(row['YEAR'])
        ID_TYPE=None if (row['ID_TYPE'] is None) else row['ID_TYPE'].strip()
        XSD_VERSION=None if (row['XSD_VERSION'] is None) else row['XSD_VERSION'].strip()
        CANCELLED=None if (row['CANCELLED'] is None) else row['CANCELLED'].strip()
        CORRECTIONS=None if row['CORRECTIONS']=='' else int(float(row['CORRECTIONS']))

        CAE_NATIONALID=None if (row['CAE_NATIONALID'] is None) else row['CAE_NATIONALID'].strip()
        CAE_ADDRESS=None if (row['CAE_ADDRESS'] is None) else row['CAE_ADDRESS'].strip()
        CAE_TOWN=None if (row['CAE_TOWN'] is None) else row['CAE_TOWN'].strip()
        CAE_POSTAL_CODE=None if (row['CAE_POSTAL_CODE'] is None) else row['CAE_POSTAL_CODE'].strip()

        CAE_TYPE=None if (row['CAE_TYPE'] is None) else row['CAE_TYPE'].strip()
        MAIN_ACTIVITY=None if (row['MAIN_ACTIVITY'] is None) else row['MAIN_ACTIVITY'].strip()
        B_ON_BEHALF=None if (row['B_ON_BEHALF'] is None) else row['B_ON_BEHALF'].strip()
        TYPE_OF_CONTRACT=None if (row['TYPE_OF_CONTRACT'] is None) else row['TYPE_OF_CONTRACT'].strip()
        TAL_LOCATION_NUTS=None if (row['TAL_LOCATION_NUTS'] is None) else row['TAL_LOCATION_NUTS'].strip()
        B_FRA_AGREEMENT=None if (row['B_FRA_AGREEMENT'] is None) else row['B_FRA_AGREEMENT'].strip()
        B_DYN_PURCH_SYST=None if (row['B_DYN_PURCH_SYST'] is None) else row['B_DYN_PURCH_SYST'].strip()
        ADDITIONAL_CPVS=None if (row['ADDITIONAL_CPVS'] is None) else row['ADDITIONAL_CPVS'].strip()
        B_GPA=None if (row['B_GPA'] is None) else row['B_GPA'].strip()
        VALUE_EURO_FIN_1=None if row['VALUE_EURO_FIN_1']=='' else int(float(row['VALUE_EURO_FIN_1']))
        VALUE_EURO_FIN_2=None if row['VALUE_EURO_FIN_2']=='' else int(float(row['VALUE_EURO_FIN_2']))
        TOP_TYPE=None if (row['TOP_TYPE'] is None) else row['TOP_TYPE'].strip()
        CRIT_CODE=None if (row['CRIT_CODE'] is None) else row['CRIT_CODE'].strip()
        CRIT_CRITERIA=None if (row['CRIT_CRITERIA'] is None) else row['CRIT_CRITERIA'].strip()
        CRIT_WEIGHTS=None if (row['CRIT_WEIGHTS'] is None) else row['CRIT_WEIGHTS'].strip()
        B_ELECTRONIC_AUCTION=None if (row['B_ELECTRONIC_AUCTION'] is None) else row['B_ELECTRONIC_AUCTION'].strip()
        NUMBER_AWARDS=None if row['NUMBER_AWARDS']=='' else int(float(row['NUMBER_AWARDS']))

        WIN_ADDRESS=None if (row['WIN_ADDRESS'] is None) else row['WIN_ADDRESS'].strip()
        WIN_TOWN=None if (row['WIN_TOWN'] is None) else row['WIN_TOWN'].strip()
        WIN_POSTAL_CODE=None if (row['WIN_POSTAL_CODE'] is None) else row['WIN_POSTAL_CODE'].strip()

        ID_AWARD=None if (row['ID_AWARD'] is None) else row['ID_AWARD'].strip()
        CONTRACT_NUMBER=None if (row['CONTRACT_NUMBER'] is None) else row['CONTRACT_NUMBER'].strip()
        LOT_NUMBER=None if (row['LOT_NUMBER'] is None) else row['LOT_NUMBER'].strip()
        TITLE=None if (row['TITLE'] is None) else row['TITLE'].strip()
        NUMBER_OFFERS_ELECTR=None if row['NUMBER_OFFERS_ELECTR']=='' else int(float(row['NUMBER_OFFERS_ELECTR'])) 
        AWARD_EST_VALUE_EURO=None if row['AWARD_EST_VALUE_EURO']=='' else int(float(row['AWARD_EST_VALUE_EURO']))
        AWARD_VALUE_EURO=None if row['AWARD_VALUE_EURO']=='' else int(float(row['AWARD_VALUE_EURO']))
        AWARD_VALUE_EURO_FIN_1=None if row['AWARD_VALUE_EURO_FIN_1']=='' else int(float(row['AWARD_VALUE_EURO_FIN_1']))
        B_SUBCONTRACTED=None if (row['B_SUBCONTRACTED'] is None) else row['B_SUBCONTRACTED'].strip()
        B_EU_FUNDS=None if (row['B_EU_FUNDS'] is None) else row['B_EU_FUNDS'].strip() 
        DT_AWARD=None if row['DT_AWARD']=='' else datetime.strptime(row['DT_AWARD'].strip(), '%d-%b-%y').strftime('%Y-%m-%d')

        awardNoticeRow=[\
                        contracting_authority_country,\
                        contracting_authority_name,\
                        dispatch_date,\
                        CPV_code_meaning,\
                        contractor_country,\
                        contractor_name,\
                        contract_value_euros,\
                        number_offers_received,\
                        CPV_code_2008,\
                        award_notice_id_link,\

                        CPV_code,\
                        YEAR,\
                        ID_TYPE,\
                        XSD_VERSION,\
                        CANCELLED,\
                        CORRECTIONS,\

                        CAE_NATIONALID,\
                        CAE_ADDRESS,\
                        CAE_TOWN,\
                        CAE_POSTAL_CODE,\

                        CAE_TYPE,\
                        MAIN_ACTIVITY,\
                        B_ON_BEHALF,\
                        TYPE_OF_CONTRACT,\
                        TAL_LOCATION_NUTS,\
                        B_FRA_AGREEMENT,\
                        B_DYN_PURCH_SYST,\
                        ADDITIONAL_CPVS,\
                        B_GPA,\
                        VALUE_EURO_FIN_1,\
                        VALUE_EURO_FIN_2,\
                        TOP_TYPE,\
                        CRIT_CODE,\
                        CRIT_CRITERIA,\
                        CRIT_WEIGHTS,\
                        B_ELECTRONIC_AUCTION,\
                        NUMBER_AWARDS,\

                        WIN_ADDRESS,\
                        WIN_TOWN,\
                        WIN_POSTAL_CODE,\

                        ID_AWARD,\
                        CONTRACT_NUMBER,\
                        LOT_NUMBER,\
                        TITLE,\
                        NUMBER_OFFERS_ELECTR,\
                        AWARD_EST_VALUE_EURO,\
                        AWARD_VALUE_EURO,\
                        AWARD_VALUE_EURO_FIN_1,\
                        B_SUBCONTRACTED,\
                        B_EU_FUNDS,\
                        DT_AWARD\

                       ]

        awardNoticeRow=[None if elt=="" else elt for elt in awardNoticeRow] 
        #awardNoticeRow=(True,tuple(awardNoticeRow),True)
        awardNoticeRow=tuple(awardNoticeRow)
    except Exception as inst:
        awardNoticeRow=(False,row,inst)
        
    return awardNoticeRow

In [None]:
schema = StructType([
    StructField("Contracting_Authority_Country", StringType(), True),
    StructField("Contracting_Authority_Name", StringType(), True),
    StructField("Dispatch_Date", StringType(), True),
    StructField("CPV_Code_Meaning", StringType(), True),
    StructField("Contractor_Country", StringType(), True),
    StructField("Contractor_Name", StringType(), True),
    StructField("Contract_Value_Euros", LongType(), True),
    StructField("Number_Offers_Received", IntegerType(), True),
    StructField("CPV_Code_2008", StringType(), True),
    StructField("Award_Notice_Id_Link", StringType(), True),
    
    StructField("CPV_Code", StringType(), True),
    StructField("YEAR", IntegerType(), True),
    StructField("ID_TYPE", StringType(), True),
    StructField("XSD_VERSION", StringType(), True),
    StructField("CANCELLED", StringType(), True),
    StructField("CORRECTIONS", IntegerType(), True),
    
    StructField("CAE_NATIONALID", StringType(), True),
    StructField("CAE_ADDRESS", StringType(), True),
    StructField("CAE_TOWN", StringType(), True),
    StructField("CAE_POSTAL_CODE", StringType(), True),
    
    StructField("CAE_TYPE", StringType(), True),
    StructField("MAIN_ACTIVITY", StringType(), True),
    StructField("B_ON_BEHALF", StringType(), True),
    StructField("TYPE_OF_CONTRACT", StringType(), True),
    StructField("TAL_LOCATION_NUTS", StringType(), True),
    StructField("B_FRA_AGREEMENT", StringType(), True),
    StructField("B_DYN_PURCH_SYST", StringType(), True),
    StructField("ADDITIONAL_CPVS", StringType(), True),
    StructField("B_GPA", StringType(), True),
    StructField("VALUE_EURO_FIN_1", LongType(), True),
    StructField("VALUE_EURO_FIN_2", LongType(), True),
    StructField("TOP_TYPE", StringType(), True),
    StructField("CRIT_CODE", StringType(), True),
    StructField("CRIT_CRITERIA", StringType(), True),
    StructField("CRIT_WEIGHTS", StringType(), True),
    StructField("B_ELECTRONIC_AUCTION", StringType(), True),
    StructField("NUMBER_AWARDS", IntegerType(), True),
    
    StructField("WIN_ADDRESS", StringType(), True),
    StructField("WIN_TOWN", StringType(), True),
    StructField("WIN_POSTAL_CODE", StringType(), True),
    
    StructField("ID_AWARD", StringType(), True),
    StructField("CONTRACT_NUMBER", StringType(), True),
    StructField("LOT_NUMBER", StringType(), True),
    StructField("TITLE", StringType(), True),
    StructField("NUMBER_OFFERS_ELECTR", IntegerType(), True),
    StructField("AWARD_EST_VALUE_EURO", LongType(), True),
    StructField("AWARD_VALUE_EURO", LongType(), True),
    StructField("AWARD_VALUE_EURO_FIN_1", LongType(), True),
    StructField("B_SUBCONTRACTED", StringType(), True),
    StructField("B_EU_FUNDS", StringType(), True),
    StructField("DT_AWARD", StringType(), True)
    ])


## Load CSV data, convert, and save to parquet
CSV files are in ../data

Four files need to be processed:
* TED_CAN_2006.csv
* TED_CAN_2007.csv
* TED_CAN_2008.csv
* TED_CAN_2009_2015.csv

which can be downloaded from https://data.europa.eu/euodp/en/data/dataset/ted-csv

Preprocess to remove misformed CSV lines (removes "" and \")
* time sed 's/\"\"//g' TED_CAN_2009_2015.csv |sed 's/\\\"/"/g'> TED_CAN_2009_2015_2.csv
* time sed 's/\"\"//g' TED_CAN_2008.csv |sed 's/\\\"/"/g'> TED_CAN_2008_2.csv
* time sed 's/\"\"//g' TED_CAN_2007.csv |sed 's/\\\"/"/g'> TED_CAN_2007_2.csv
* time sed 's/\"\"//g' TED_CAN_2006.csv |sed 's/\\\"/"/g'> TED_CAN_2006_2.csv



In [None]:
pathData='data/TED_CAN_2009_2015_3.csv'
#pathData='data2/TED_CAN_2015_3.csv'
#pathData='data2/TED_CAN_2008_2.csv'
#pathData='data2/TED_CAN_2007_2.csv'
#pathData='data2/TED_CAN_2006_2.csv'


### Load

In [None]:
csvfile = sqlContext.read.format('com.databricks.spark.csv',).options(header='true', inferschema='false').load(pathData)#.repartition(2000)
csvfile.registerTempTable("csvData");


### Process

In [None]:
processedData=csvfile.rdd.map(process)
df=processedData.toDF(schema)

### Save

In [None]:
df.write.mode('append').parquet('ted.parquet')