## 1. The World Bank's international debt data
<p>No es que los humanos solo tengamos deudas para administrar nuestras necesidades. Un país también puede endeudarse para administrar su economía. Por ejemplo, el gasto en infraestructura es un ingrediente costoso requerido para que los ciudadanos de un país lleven una vida cómoda. El Banco Mundial es la organización que proporciona deuda a los países.</p>

<!-- <p>En este notebook, vamos a analizar los datos de la deuda internacional recopilados por el Banco Mundial. El conjunto de datos contiene información sobre el monto de la deuda (en USD) que deben los países en desarrollo en varias categorías.</p>  -->
    

### Autor: Pablo Revuelta Stobbs

##### Fecha:14/01/2021

"Disbursements on external debt, long-term (DIS, current US$)",DT.DIS.DLXF.CD

"Interest payments on external debt, long-term (INT, current US$)",DT.INT.DLXF.CD

"PPG, bilateral (AMT, current US$)",DT.AMT.BLAT.CD

"PPG, bilateral (DIS, current US$)",DT.DIS.BLAT.CD

"PPG, bilateral (INT, current US$)",DT.INT.BLAT.CD

"PPG, multilateral (AMT, current US$)",DT.AMT.MLAT.CD

"PPG, multilateral (DIS, current US$)",DT.DIS.MLAT.CD

"PPG, multilateral (INT, current US$)",DT.INT.MLAT.CD

"PPG, official creditors (AMT, current US$)",DT.AMT.OFFT.CD

"PPG, official creditors (DIS, current US$)",DT.DIS.OFFT.CD

"PPG, official creditors (INT, current US$)",DT.INT.OFFT.CD

"Principal repayments on external debt, long-term (AMT, current US$)",DT.AMT.DLXF.CD

"Interest payments on external debt, private nonguaranteed (PNG) (INT, current US$)",DT.INT.DPNG.CD

"PPG, bonds (AMT, current US$)",DT.AMT.PBND.CD

"PPG, bonds (INT, current US$)",DT.INT.PBND.CD

"PPG, commercial banks (AMT, current US$)",DT.AMT.PCBK.CD

"PPG, commercial banks (DIS, current US$)",DT.DIS.PCBK.CD

"PPG, commercial banks (INT, current US$)",DT.INT.PCBK.CD

"PPG, other private creditors (AMT, current US$)",DT.AMT.PROP.CD

"PPG, other private creditors (DIS, current US$)",DT.DIS.PROP.CD

"PPG, other private creditors (INT, current US$)",DT.INT.PROP.CD

"PPG, private creditors (AMT, current US$)",DT.AMT.PRVT.CD

"PPG, private creditors (DIS, current US$)",DT.DIS.PRVT.CD

"PPG, private creditors (INT, current US$)",DT.INT.PRVT.CD

"Principal repayments on external debt, private nonguaranteed (PNG) (AMT, current US$)",DT.AMT.DPNG.CD

<p>Vamos a encontrar las respuestas a preguntas como:

<p>¿Cuál es el monto total de la deuda que deben los países enumerados en el conjunto de datos?
<p>¿Qué país posee la cantidad máxima de deuda y cómo se ve esa cantidad?
<p>¿Cuál es el monto promedio de la deuda de los países a través de diferentes indicadores de deuda?
    
Además tenemos otro dataset en el que encontramos información histórica de algunos índices de desarrollo, entre los que se encuentran algunos de deuda como son:

Series Name,Series Code
"Birth rate, crude (per 1,000 people)",SP.DYN.CBRT.IN

"Central government debt, total (current LCU)",GC.DOD.TOTL.CN

"Central government debt, total (% of GDP)",GC.DOD.TOTL.GD.ZS

## INDICE

#### 1. Inicializar y cargar el contexto spark

##### 2. Número de países distintos en cada dataset. Coinciden?

##### 3. Total de deuda contraida por cada pais: total amount of debt (in USD) DT.AMT.MLAT.CD

##### 4. Media de los indicadores de deuda (tabla uno): DT.AMT.BLAT.CD, DT.DIS.BLAT.CD, DT.INT.BLAT.CD

##### 5. Los 20 paises con DT.AMT.DLXF.CD más alto

##### 6. Pais con los datos informados todos los años.

##### 7. Media anual de los distintos indicadores de desarrollo

##### 8. Podrías decirme el total de deuda acumulada DT.AMT.MLAT.CD por los 10 países con un valor en media menor de SP.DYN.CBRT.IN

##### 9. ¿Hay alguna relación entre los nacimientos y el indicador DT.AMT.DLXF.CD? ¿Cómo podrías demostrarlo?

--------------------------------------------------------------------

#### 1. Inicializar y cargar el contexto spark

In [1]:
from pyspark import SparkContext
sc = SparkContext()

Para corroborar que ha funcionado la inicialización y carga del contexto Spark, se mira la configuración.

In [2]:
conf = sc.getConf()

In [3]:
conf.getAll()

[('spark.driver.port', '41663'),
 ('spark.driver.host', '172.17.0.2'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.id', 'local-1610662371238'),
 ('spark.app.name', 'pyspark-shell')]

## Parseado de los RDDs

Carga de los datos

In [4]:
path_file1 = "./indicadores_desarrollo.csv"
path_file2 = "./indicadores_deuda.csv"

In [5]:
raw_data_desarrollo = sc.textFile(path_file1)
raw_data_deuda = sc.textFile(path_file2, )

Se comprueba que se han cargado correctamente los datos mediante una accion, en este caso la función count().

In [6]:
(raw_data_desarrollo.count(),raw_data_deuda.count())

(135, 125)

Conocer los datos mediante take(), para ver como están estructurados

In [7]:
raw_data_desarrollo.take(5)

['country_name,country_code,anho,GC.DOD.TOTL.CN,GC.DOD.TOTL.GD.ZS,SP.DYN.CBRT.IN',
 'Albania,ALB,2014 [YR2014],1023040800000,73.3202469198429,12.259',
 'Albania,ALB,2015 [YR2015],1145500940000,79.8644448747259,12.197',
 'Albania,ALB,2016 [YR2016],1188814490000,80.7355748396923,12.08',
 'Australia,AUS,2014 [YR2014],674700000,0.042207528166503,13.2']

In [8]:
raw_data_deuda.take(5)

['country_name,country_code,DT.AMT.BLAT.CD,DT.AMT.DLXF.CD,DT.AMT.DPNG.CD,DT.AMT.MLAT.CD,DT.AMT.OFFT.CD,DT.AMT.PBND.CD,DT.AMT.PCBK.CD,DT.AMT.PROP.CD,DT.AMT.PRVT.CD,DT.DIS.BLAT.CD,DT.DIS.DLXF.CD,DT.DIS.MLAT.CD,DT.DIS.OFFT.CD,DT.DIS.PCBK.CD,DT.DIS.PROP.CD,DT.DIS.PRVT.CD,DT.INT.BLAT.CD,DT.INT.DLXF.CD,DT.INT.DPNG.CD,DT.INT.MLAT.CD,DT.INT.OFFT.CD,DT.INT.PBND.CD,DT.INT.PCBK.CD,DT.INT.PROP.CD,DT.INT.PRVT.CD',
 'Afghanistan,AFG,61739336.9,100847181.9,,39107845,100847181.9,,,,,49114729.4,72894453.7,23779724.3,72894453.7,,,,39903620.1,53239440.1,,13335820,53239440.1,,,,',
 'Albania,ALB,54250280.6,790248675.2,514185620,182197616.7,236447897.3,0,39445139.5,170018.4,39615157.9,76050616.1,317194512.5,234321242.3,310371858.4,4542664.9,2279989.2,6822654.1,13847333.6,165602386.9,87884000,28101536.1,41948869.7,31030688.2,4618504.3,120324.7,35769517.2',
 'Algeria,DZA,95188724.6,171185188.1,75420000,0,95188724.6,,576463.5,0,576463.5,10320772.2,10320772.2,,10320772.2,,,,7680627.6,19031728.7,3220000,8094779,

Se elimina el header de las bases de datos

In [9]:
head_raw_data_desarrollo = raw_data_desarrollo.take(1)[0] #le damos un valor a take(1)[0]
raw_data_desarrollo =  raw_data_desarrollo.filter(lambda x: x != head_raw_data_desarrollo) #creamos una funcion filter con lambda para eliminar la cabecera
head_raw_data_deuda = raw_data_deuda.take(1)[0]
raw_data_deuda =  raw_data_deuda.filter(lambda x: x != head_raw_data_deuda)

In [10]:
raw_data_desarrollo.take(1)

['Albania,ALB,2014 [YR2014],1023040800000,73.3202469198429,12.259']

In [11]:
raw_data_deuda.take(1)

['Afghanistan,AFG,61739336.9,100847181.9,,39107845,100847181.9,,,,,49114729.4,72894453.7,23779724.3,72894453.7,,,,39903620.1,53239440.1,,13335820,53239440.1,,,,']

Se observa que efectivamente se han eliminado las cabeceras

Previamente a contestar a las preguntas, se realizará el parseado de los datasets para dejarlos listos para trabajar con ellos

In [12]:
parsed_data_desarrollo = raw_data_desarrollo.map(lambda x: x.split(",")) #se indica que se separen las obs. mediante un map por la ","
parsed_data_deuda = raw_data_deuda.map(lambda x: x.split(","))

In [13]:
parsed_data_desarrollo.map(lambda x: (len(x))).distinct().count()

2

In [14]:
parsed_data_deuda.map(lambda x: (len(x))).distinct().count()

2

Hay dos length distinta en el RDD, eso indica que hay valores que no se han parseado bien.

Vamos a observar donde está el error

In [15]:
parsed_data_desarrollo.map(lambda x: (len(x), x[0:2])).take(10)

[(6, ['Albania', 'ALB']),
 (6, ['Albania', 'ALB']),
 (6, ['Albania', 'ALB']),
 (6, ['Australia', 'AUS']),
 (6, ['Australia', 'AUS']),
 (6, ['Australia', 'AUS']),
 (7, ['"Bahamas', ' The"']),
 (7, ['"Bahamas', ' The"']),
 (7, ['"Bahamas', ' The"']),
 (6, ['Barbados', 'BRB'])]

Se observa que hay filas en las que length es 6 y otras con length igual a 7, esto es debido a que en las de 7 se ha separado parte del nombre del país.

In [16]:
parsed_data_desarrollo.map(lambda x: (len(x), x)).filter(lambda x: x[0] == 6).take(2)

[(6,
  ['Albania',
   'ALB',
   '2014 [YR2014]',
   '1023040800000',
   '73.3202469198429',
   '12.259']),
 (6,
  ['Albania',
   'ALB',
   '2015 [YR2015]',
   '1145500940000',
   '79.8644448747259',
   '12.197'])]

Se guarda la lista que indica el length y la lista de datos

In [17]:
length_desarrollo = parsed_data_desarrollo.map(lambda x: (len(x), x))
length_desarrollo.take(10)

[(6,
  ['Albania',
   'ALB',
   '2014 [YR2014]',
   '1023040800000',
   '73.3202469198429',
   '12.259']),
 (6,
  ['Albania',
   'ALB',
   '2015 [YR2015]',
   '1145500940000',
   '79.8644448747259',
   '12.197']),
 (6,
  ['Albania',
   'ALB',
   '2016 [YR2016]',
   '1188814490000',
   '80.7355748396923',
   '12.08']),
 (6,
  ['Australia',
   'AUS',
   '2014 [YR2014]',
   '674700000',
   '0.042207528166503',
   '13.2']),
 (6,
  ['Australia',
   'AUS',
   '2015 [YR2015]',
   '762718000',
   '0.0469480198522591',
   '12.9']),
 (6,
  ['Australia',
   'AUS',
   '2016 [YR2016]',
   '907555000',
   '0.054648482520169',
   '12.9']),
 (7,
  ['"Bahamas',
   ' The"',
   'BHS',
   '2014 [YR2014]',
   '5158420000',
   '47.2672793747079',
   '14.34']),
 (7,
  ['"Bahamas',
   ' The"',
   'BHS',
   '2015 [YR2015]',
   '5655150000',
   '48.1195170307089',
   '14.224']),
 (7,
  ['"Bahamas',
   ' The"',
   'BHS',
   '2016 [YR2016]',
   '6138814000',
   '51.4207431481606',
   '14.131']),
 (6,
  ['Barbados

Se define una función que solucionará el problema, ya que cuando observe que hay un valor 7 en el length, lo que realizará será sumar el valor de la lista en la posición [0] y el de la posición [1].

In [22]:
def parse_desarrollo(line):
    if line[0] == 6:
        return line[1]
    else:
        l = line[1][1:]
        return l.insert(0, line[1][0] + line[1][1])

In [23]:
parsed_data_desarrollo1 = length_desarrollo.map(lambda x: parse_desarrollo(x))
parsed_data_desarrollo1.take(7)

[['Albania',
  'ALB',
  '2014 [YR2014]',
  '1023040800000',
  '73.3202469198429',
  '12.259'],
 ['Albania',
  'ALB',
  '2015 [YR2015]',
  '1145500940000',
  '79.8644448747259',
  '12.197'],
 ['Albania',
  'ALB',
  '2016 [YR2016]',
  '1188814490000',
  '80.7355748396923',
  '12.08'],
 ['Australia',
  'AUS',
  '2014 [YR2014]',
  '674700000',
  '0.042207528166503',
  '13.2'],
 ['Australia',
  'AUS',
  '2015 [YR2015]',
  '762718000',
  '0.0469480198522591',
  '12.9'],
 ['Australia',
  'AUS',
  '2016 [YR2016]',
  '907555000',
  '0.054648482520169',
  '12.9'],
 None]

In [24]:
parsed_data_deuda.map(lambda x: (len(x), x[0:2])).take(10)

[(27, ['Afghanistan', 'AFG']),
 (27, ['Albania', 'ALB']),
 (27, ['Algeria', 'DZA']),
 (27, ['Angola', 'AGO']),
 (27, ['Armenia', 'ARM']),
 (27, ['Azerbaijan', 'AZE']),
 (27, ['Bangladesh', 'BGD']),
 (27, ['Belarus', 'BLR']),
 (27, ['Belize', 'BLZ']),
 (27, ['Benin', 'BEN'])]

Realizamos lo mismo para la base de datos de deuda. En este caso vamos a buscar cual es la diferencia de length mediante un filtrado ya que al observar sus distancias es dificil encontrar el valor distinto de 27.

In [25]:
deuda_27 = parsed_data_deuda.map(lambda x: (len(x), x[0:2]))

Para saber la distancia que no es igual a 27 hacemos:

In [26]:
deuda_27.filter(lambda x: x[0] != 27).take(5)

[(28, ['"Congo', ' Dem. Rep."']),
 (28, ['"Congo', ' Rep."']),
 (28, ['"Egypt', ' Arab Rep."']),
 (28, ['"Gambia', ' The"']),
 (28, ['"Iran', ' Islamic Rep."'])]

Se observa que para la base de datos de deuda, hay filas con un length de 28 por el mismo motivo que para la base de datos de desarrollo

Guardamos la lista que indica el length y la lista

In [27]:
length_deuda = parsed_data_deuda.map(lambda x: (len(x), x))
length_deuda.take(5)

[(27,
  ['Afghanistan',
   'AFG',
   '61739336.9',
   '100847181.9',
   '',
   '39107845',
   '100847181.9',
   '',
   '',
   '',
   '',
   '49114729.4',
   '72894453.7',
   '23779724.3',
   '72894453.7',
   '',
   '',
   '',
   '39903620.1',
   '53239440.1',
   '',
   '13335820',
   '53239440.1',
   '',
   '',
   '',
   '']),
 (27,
  ['Albania',
   'ALB',
   '54250280.6',
   '790248675.2',
   '514185620',
   '182197616.7',
   '236447897.3',
   '0',
   '39445139.5',
   '170018.4',
   '39615157.9',
   '76050616.1',
   '317194512.5',
   '234321242.3',
   '310371858.4',
   '4542664.9',
   '2279989.2',
   '6822654.1',
   '13847333.6',
   '165602386.9',
   '87884000',
   '28101536.1',
   '41948869.7',
   '31030688.2',
   '4618504.3',
   '120324.7',
   '35769517.2']),
 (27,
  ['Algeria',
   'DZA',
   '95188724.6',
   '171185188.1',
   '75420000',
   '0',
   '95188724.6',
   '',
   '576463.5',
   '0',
   '576463.5',
   '10320772.2',
   '10320772.2',
   '',
   '10320772.2',
   '',
   '',
   ''

Creamos una función con la misma finalidad que para la base de datos de desarrollo

In [28]:
def parse_deuda(line):
    if line[0] == 27:
        return line[1]
    else:
        l = line[1][1:]
        return l.insert(0, line[1][0] + line[1][1])

La ejecutamos sobre la lista

In [29]:
parsed_data_deuda1 = length_deuda.map(lambda x: parse_deuda(x))
parsed_data_deuda1.take(2)

[['Afghanistan',
  'AFG',
  '61739336.9',
  '100847181.9',
  '',
  '39107845',
  '100847181.9',
  '',
  '',
  '',
  '',
  '49114729.4',
  '72894453.7',
  '23779724.3',
  '72894453.7',
  '',
  '',
  '',
  '39903620.1',
  '53239440.1',
  '',
  '13335820',
  '53239440.1',
  '',
  '',
  '',
  ''],
 ['Albania',
  'ALB',
  '54250280.6',
  '790248675.2',
  '514185620',
  '182197616.7',
  '236447897.3',
  '0',
  '39445139.5',
  '170018.4',
  '39615157.9',
  '76050616.1',
  '317194512.5',
  '234321242.3',
  '310371858.4',
  '4542664.9',
  '2279989.2',
  '6822654.1',
  '13847333.6',
  '165602386.9',
  '87884000',
  '28101536.1',
  '41948869.7',
  '31030688.2',
  '4618504.3',
  '120324.7',
  '35769517.2']]

Observamos mediante count que siguen habiendo los mismo registros

In [31]:
#parsed_data_desarrollo1.map(lambda x: (len(x))).distinct().count()

In [55]:
(parsed_data_desarrollo1.count(), parsed_data_deuda1.count())

(134, 124)

El parseado realizado da problemas, se realizará mediante filtrado

#### Parseado mediante filtro del dataset Desarrollo

In [32]:
desarrollo_length7 = parsed_data_desarrollo.filter(lambda x: (len(x) != 6)) #filtramos por los que tengan un length distinto de 6

In [33]:
desarrollo_length7 = desarrollo_length7.map(lambda x: (x[0],x[2],x[3],x[4],x[5],x[6])) #no queremos el valor que esta en la posición 1, por lo que no se selecciona

In [34]:
parsed_data_desarrollo = parsed_data_desarrollo.filter(lambda x: (len(x)== 6)) #filtramos por los que tengan length igual a 6

In [35]:
parsed_data_desarrollo = parsed_data_desarrollo.union(desarrollo_length7) #unimos los dos rdds.

In [36]:
parsed_data_desarrollo.take(7)

[['Albania',
  'ALB',
  '2014 [YR2014]',
  '1023040800000',
  '73.3202469198429',
  '12.259'],
 ['Albania',
  'ALB',
  '2015 [YR2015]',
  '1145500940000',
  '79.8644448747259',
  '12.197'],
 ['Albania',
  'ALB',
  '2016 [YR2016]',
  '1188814490000',
  '80.7355748396923',
  '12.08'],
 ['Australia',
  'AUS',
  '2014 [YR2014]',
  '674700000',
  '0.042207528166503',
  '13.2'],
 ['Australia',
  'AUS',
  '2015 [YR2015]',
  '762718000',
  '0.0469480198522591',
  '12.9'],
 ['Australia',
  'AUS',
  '2016 [YR2016]',
  '907555000',
  '0.054648482520169',
  '12.9'],
 ['Barbados',
  'BRB',
  '2014 [YR2014]',
  '12175704025',
  '129.629591844133',
  '10.916']]

#### Parseado mediante filtro para el dataset de Deuda

In [37]:
deuda_length28 = parsed_data_deuda.filter(lambda x: (len(x) != 27))

In [38]:
deuda_length28 = deuda_length28.map(lambda x: (x[0],x[2],x[3],x[4],x[5],x[6],x[7],x[8],x[9],x[10],x[11],x[12],x[13],x[14],x[15],x[16],x[17],x[18],x[19],x[20],x[21],x[22],x[23],x[24],x[25],x[26],x[27]))

In [39]:
parsed_data_deuda = parsed_data_deuda.filter(lambda x: (len(x) == 27))

In [40]:
parsed_data_deuda = parsed_data_deuda.union(deuda_length28)

In [41]:
parsed_data_deuda.take(1)

[['Afghanistan',
  'AFG',
  '61739336.9',
  '100847181.9',
  '',
  '39107845',
  '100847181.9',
  '',
  '',
  '',
  '',
  '49114729.4',
  '72894453.7',
  '23779724.3',
  '72894453.7',
  '',
  '',
  '',
  '39903620.1',
  '53239440.1',
  '',
  '13335820',
  '53239440.1',
  '',
  '',
  '',
  '']]

In [42]:
parsed_data_desarrollo.map(lambda x: (len(x))).distinct().count()

1

In [43]:
parsed_data_deuda.map(lambda x: (len(x))).distinct().count()

1

In [44]:
parsed_data_deuda.count(), parsed_data_desarrollo.count()

(124, 134)

Se define una función para que elimine los valores nulos y pase a float todas las observaciones que deban pasarse a float.

In [46]:
def to_float(x):
    if x in ["", " ", "nan", "null", "NULL", "Null", "NaN", "Na"]:
        return 0.0
    else:
        return float(x)

##### 2. Número de países distintos en cada dataset. Coinciden?

Para conocer el número de países distintos en cada dataset y si estos coinciden, se utilizarán las siguientes funciones:
- Distinct(), esta función devuelve un nuevo RDD con valores únicos.

In [45]:
country_deuda = parsed_data_deuda.map(lambda x: x[0]).distinct() #seleeciona el valor en la posicion 0 (country) y le pedimos que devuelva los valores sin repeticiones
country_desarrollo = parsed_data_desarrollo.map(lambda x: x[0]).distinct()
(country_deuda.count(), country_desarrollo.count())

(123, 51)

Observando que el número de valores únicos en country_deuda es de 123 y de country_desarrollo es de 51, queda claro que no coinciden.

##### 3. Total de deuda contraida por cada pais: total amount of debt (in USD) DT.AMT.MLAT.CD

In [51]:
parsed_data_deuda.map(lambda x: (x[0], to_float(x[5]))).reduceByKey(lambda x, y: x+y).sortBy(lambda x: x[1], ascending = False).collect()
#mediante un map se seleccionan las columnas del país y de la variable a analizar, con esto se crea un key value.
#posteriormente se utiliza un reduce by key (por eso se hace el map anterior) para que sume los valores de la deuda (suma todos los años).
#Por último se ordena de mayor a menor por la variable de la deuda y se hace un collect para que devuelva todos los valores.

[('South Asia', 7851739929.5),
 ('IDA only', 4884095623.3),
 ('India', 4545609909.9),
 ('Least developed countries: UN classification', 4084275524.2),
 ('Turkey', 2932698678.0),
 ('China', 2615723714.1),
 ('Brazil', 2514318741.6),
 ('Indonesia', 2469145824.7),
 ('"Egypt', 2424904078.0),
 ('Romania', 1798305095.8),
 ('Pakistan', 1578651453.9),
 ('Morocco', 1381649282.3),
 ('Colombia', 1100907553.6),
 ('Kazakhstan', 1079948889.7),
 ('Ukraine', 1072472830.3),
 ('Bangladesh', 1039564682.7),
 ('Tunisia', 956604000.1),
 ('Vietnam', 947632152.5),
 ('Mexico', 864181883.5),
 ('Philippines', 739056427.3),
 ('Serbia', 715223963.7),
 ('Belarus', 595438826.0),
 ('Ecuador', 578894793.9),
 ('Peru', 553634090.0),
 ('Azerbaijan', 513623656.5),
 ('"Venezuela', 509512564.6),
 ('Sri Lanka', 449771961.1),
 ('Kenya', 391340889.9),
 ('Bolivia', 357721211.5),
 ('South Africa', 346393763.9),
 ('Guatemala', 339822253.6),
 ('El Salvador', 298939687.1),
 ('Nigeria', 297246686.6),
 ('Bosnia and Herzegovina', 29017

Otra forma de hacerlo, agrupa por país y por la variable a analizar.

In [54]:
deuda_total_pais = parsed_data_deuda.groupBy(lambda x: (x[0],to_float(x[5])))

deuda_total_pais.collect()

[(('Solomon Islands', 3952828.9),
  <pyspark.resultiterable.ResultIterable at 0x7f65ebd61550>),
 (('Vanuatu', 2593419.3),
  <pyspark.resultiterable.ResultIterable at 0x7f65ebd61588>),
 (('Least developed countries: UN classification', 4084275524.2),
  <pyspark.resultiterable.ResultIterable at 0x7f65ebd61748>),
 (('South Asia', 7851739929.5),
  <pyspark.resultiterable.ResultIterable at 0x7f65ebd616a0>),
 (('Burundi', 16115111.8),
  <pyspark.resultiterable.ResultIterable at 0x7f65ebd61898>),
 (('Uganda', 145310077.2),
  <pyspark.resultiterable.ResultIterable at 0x7f65ebd61940>),
 (('Montenegro', 77948064.8),
  <pyspark.resultiterable.ResultIterable at 0x7f65ebd619b0>),
 (('Gabon', 62404132.3),
  <pyspark.resultiterable.ResultIterable at 0x7f65ebd617b8>),
 (('"Venezuela', 509512564.6),
  <pyspark.resultiterable.ResultIterable at 0x7f65ebd61b70>),
 (('Serbia', 715223963.7),
  <pyspark.resultiterable.ResultIterable at 0x7f65ebd61c18>),
 (('Afghanistan', 39107845.0),
  <pyspark.resultiterabl

##### 4. Media de los indicadores de deuda (tabla uno): DT.AMT.BLAT.CD, DT.DIS.BLAT.CD, DT.INT.BLAT.CD

La primera forma de hacerlo es con la función mean. En este caso se seleccionan mediante un map las variables a analizar, se pasa a float mediante la función definida anteriormente y después se obtiene su media con mean().

In [53]:
print("La media de deuda para DT.AMT.BLAT.CD es {}.".format(parsed_data_deuda.map(lambda x: to_float(x[2])).mean()))
print("La media de deuda para DT.DIS.BLAT.CD es {}.".format(parsed_data_deuda.map(lambda x: to_float(x[11])).mean()))
print("La media de deuda para DT.INT.BLAT.CD es {}.".format(parsed_data_deuda.map(lambda x: to_float(x[18])).mean()))

La media de deuda para DT.AMT.BLAT.CD es 701125769.5846772.
La media de deuda para DT.DIS.BLAT.CD es 1114634998.5080643.
La media de deuda para DT.INT.BLAT.CD es 161446619.71209675.


La segunda forma de hacerlo es mediante aggregate.

In [55]:
deuda1_data = parsed_data_deuda.map(lambda x: (to_float(x[2]), to_float(x[11]), to_float(x[18])))
deuda1_data.take(5) #Se guarda en un rdd los datos de las variables a analizar para posteriormente hacer el aggregate sobre ellas

[(61739336.9, 49114729.4, 39903620.1),
 (54250280.6, 76050616.1, 13847333.6),
 (95188724.6, 10320772.2, 7680627.6),
 (8473824016.3, 8838256901.1, 1005053965.1),
 (68968314.7, 174269846.7, 24094832.0)]

In [56]:
deuda1_count_sum = deuda1_data.aggregate(
(0,0,0,0), #valor inicial, las posiciones x[0]x[1]x[2] serán la suma de las deudas y x[3] será las veces que se suma, un contador, cuenta la cantidad de iteracciones realizadas
    (lambda acc, value: (acc[0] + value[0], acc[1] + value[1], acc[2] + value[2], acc[3] + 1)), #Sobre la posiciones de las deudas va sumando los valores y por ultimo se va sumando el contador
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1], acc1[2] + acc2[2], acc1[3] + acc2[3])) #Se realizan los acumulados, sobre el valor anterior sumado, se suma el nuevo y lo mismo para el contador
)
print("La media de deuda para DT.AMT.BLAT.CD es {}.".format(deuda1_count_sum[0]/deuda1_count_sum[3])) #Se divide el valor de la deuda (total) entre el contador para obtener la media
print("La media de deuda para DT.DIS.BLAT.CD es {}.".format(deuda1_count_sum[1]/deuda1_count_sum[3]))
print("La media de deuda para DT.INT.BLAT.CD es {}.".format(deuda1_count_sum[2]/deuda1_count_sum[3]))

La media de deuda para DT.AMT.BLAT.CD es 701125769.5846775.
La media de deuda para DT.DIS.BLAT.CD es 1114634998.5080647.
La media de deuda para DT.INT.BLAT.CD es 161446619.71209678.


##### 5. Los 20 paises con DT.AMT.DLXF.CD más alto

Para seleccionar los 20 países con el valor más alto, se realiza una función sortBy para ordenar de mayor a menor en función de la columna DT.AMT.DLXF.CD, después se realiza un map para que devuelva solo los valores de la primera columna (país) y la columna objetivo. Esta columna se pasa a float mediante la función anteriormente creada para que los valores nulos los cuente como 0.0 y luego pase los valores a float.
De este modo se obtiene el resultado.

In [57]:
parsed_data_deuda.sortBy(lambda x: to_float(x[3]), ascending = False).\
                     map(lambda x: (x[0], to_float(x[3]))).take(20)
    #Se ordenan de mayor a menor por la variable a analizar (pasa a float mediante la función)
    #Posteriormente con un map se seleccionan las variables que nos interesan que son el país (x[0]) y la variable a analizar x[3]
    #Con take se devuelven los 20 valores más altos.

[('China', 96218620835.7),
 ('Brazil', 90041840304.1),
 ('Russian Federation', 66589761833.5),
 ('Turkey', 51555031005.8),
 ('South Asia', 48756295898.2),
 ('India', 31923507000.8),
 ('Indonesia', 30916112653.8),
 ('Kazakhstan', 27482093686.4),
 ('Mexico', 25218503927.0),
 ('Least developed countries: UN classification', 25197029299.4),
 ('IDA only', 20483289208.0),
 ('Romania', 14013783350.4),
 ('Colombia', 11985674438.7),
 ('Angola', 11067045628.1),
 ('Cameroon', 10404814960.2),
 ('"Venezuela', 9878659207.2),
 ('"Egypt', 9692114176.9),
 ('Lebanon', 9506919669.6),
 ('South Africa', 9474257551.9),
 ('Vietnam', 8873505909.2)]

##### 6. Pais con los datos informados todos los años.

El objetivo es primero conocer los años en los que hay datos, estos son 4 años. Después se trata de devolver países que han reportado algún dato durante todos los años. Para ello se buscará los paises en los que el número de años en los que ha reportado datos sea igual a 4.

In [58]:
parsed_data_desarrollo.map(lambda x: x[2]).distinct().collect() #se observa que hay 4 años distinots en el rdd

['2014 [YR2014]', '2015 [YR2015]', '2016 [YR2016]', '2017 [YR2017]']

In [59]:
parsed_data_desarrollo.map(lambda x: x[2]).distinct().count()

4

In [61]:
parsed_data_desarrollo.map(lambda x: (x[0], 1)).\
    reduceByKey(lambda x, y: x+y).\
    filter(lambda x: x[1] == 4).take(5)
    
    #Se selecciona la variable x[0], el país y se añade al lado un 1. Este uno indicará las veces que aparece, que debe ser 4 veces (por cada año)
    #Se realiza un reduceByKey para que agrupe por la key (x[0]) y sume el valor a su lado, en este caso va sumando 1 por cada vez que lo agrupa
    #se filtra para que devuelva los países donde su conteo es = 4 (aparece 4 veces, uno por cada año)

[('Malawi', 4)]

##### 7. Media anual de los distintos indicadores de desarrollo

El proceso para obtener la media anual de cada indicador será primero filtrar por cada año para realizar el cálculo con cada filtrado. Después se hara la fórmula de aggregate para que vaya sumando el valor de cada indicador y también añada un contador con las veces que va realizando el proceso, para finalmente calcular la media.

#### Cálculo para 2014

In [62]:
desarrollo_2014 = parsed_data_desarrollo.filter(lambda x: x[2] == "2014 [YR2014]").\
    map(lambda x: (to_float(x[3]), to_float(x[4]), to_float(x[5])))
    #filtrado para que en la fecha devuelva los datos de 2014.
    #se hace un map para que devuelva las variables que se van a analizar

In [63]:
desarrollo2014_count_sum = desarrollo_2014.aggregate(
(0,0,0,0), #valor inicial, la posicion x[0]x[1]x[2] serán la suma de las variables y x[3] será las veces que se suma, un contador, cuenta la cantidad de iteracciones
    (lambda acc, value: (acc[0] + value[0], acc[1] + value[1], acc[2] + value[2], acc[3] + 1)),#Es el mismo proceso que el ejercicio 4
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1], acc1[2] + acc2[2], acc1[3] + acc2[3]))
)
print("La media para GC.DOD.TOTL.CN en 2014 es {}.".format(desarrollo2014_count_sum[0]/desarrollo2014_count_sum[3]))
print("La media para GC.DOD.TOTL.GD.ZS en 2014 es {}.".format(desarrollo2014_count_sum[1]/desarrollo2014_count_sum[3]))
print("La media para SP.DYN.CBRT.IN en 2014 es {}.".format(desarrollo2014_count_sum[2]/desarrollo2014_count_sum[3]))

La media para GC.DOD.TOTL.CN en 2014 es 103442758732953.23.
La media para GC.DOD.TOTL.GD.ZS en 2014 es 56.8439143443844.
La media para SP.DYN.CBRT.IN en 2014 es 17.582620000000002.


#### Cálculo para 2015

In [64]:
desarrollo_2015 = parsed_data_desarrollo.filter(lambda x: x[2] == "2015 [YR2015]").\
    map(lambda x: (to_float(x[3]), to_float(x[4]), to_float(x[5])))

In [65]:
desarrollo2015_count_sum = desarrollo_2015.aggregate(
(0,0,0,0), #valor inicial, la posicion x[0] será la suma de deuda y x[1] será las veces que se suma, como un count, cuenta la cantidad de datos que hay
    (lambda acc, value: (acc[0] + value[0], acc[1] + value[1], acc[2] + value[2], acc[3] + 1)),
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1], acc1[2] + acc2[2], acc1[3] + acc2[3]))
)
print("La media para GC.DOD.TOTL.CN en 2015 es {}.".format(desarrollo2015_count_sum[0]/desarrollo2015_count_sum[3]))
print("La media para GC.DOD.TOTL.GD.ZS en 2015 es {}.".format(desarrollo2015_count_sum[1]/desarrollo2015_count_sum[3]))
print("La media para SP.DYN.CBRT.IN en 2015 es {}.".format(desarrollo2015_count_sum[2]/desarrollo2015_count_sum[3]))

La media para GC.DOD.TOTL.CN en 2015 es 131824403915902.5.
La media para GC.DOD.TOTL.GD.ZS en 2015 es 58.36559834488034.
La media para SP.DYN.CBRT.IN en 2015 es 16.798477272727272.


#### Cálculo para 2016

In [66]:
desarrollo_2016 = parsed_data_desarrollo.filter(lambda x: x[2] == "2016 [YR2016]").\
    map(lambda x: (to_float(x[3]), to_float(x[4]), to_float(x[5])))

In [67]:
desarrollo2016_count_sum = desarrollo_2016.aggregate(
(0,0,0,0), #valor inicial, la posicion x[0] será la suma de deuda y x[1] será las veces que se suma, como un count, cuenta la cantidad de datos que hay
    (lambda acc, value: (acc[0] + value[0], acc[1] + value[1], acc[2] + value[2], acc[3] + 1)),
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1], acc1[2] + acc2[2], acc1[3] + acc2[3]))
)
print("La media para GC.DOD.TOTL.CN en 2016 es {}.".format(desarrollo2016_count_sum[0]/desarrollo2016_count_sum[3]))
print("La media para GC.DOD.TOTL.GD.ZS en 2016 es {}.".format(desarrollo2016_count_sum[1]/desarrollo2016_count_sum[3]))
print("La media para SP.DYN.CBRT.IN en 2016 es {}.".format(desarrollo2016_count_sum[2]/desarrollo2016_count_sum[3]))

La media para GC.DOD.TOTL.CN en 2016 es 158813821548065.94.
La media para GC.DOD.TOTL.GD.ZS en 2016 es 58.796949624008356.
La media para SP.DYN.CBRT.IN en 2016 es 16.144333333333332.


#### Cálculo para 2017

In [68]:
desarrollo_2017 = parsed_data_desarrollo.filter(lambda x: x[2] == "2017 [YR2017]").\
    map(lambda x: (to_float(x[3]), to_float(x[4]), to_float(x[5])))

In [69]:
desarrollo2017_count_sum = desarrollo_2017.aggregate(
(0,0,0,0), #valor inicial, la posicion x[0] será la suma de deuda y x[1] será las veces que se suma, como un count, cuenta la cantidad de datos que hay
    (lambda acc, value: (acc[0] + value[0], acc[1] + value[1], acc[2] + value[2], acc[3] + 1)),
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1], acc1[2] + acc2[2], acc1[3] + acc2[3]))
)
print("La media para GC.DOD.TOTL.CN en 2017 es {}.".format(desarrollo2017_count_sum[0]/desarrollo2017_count_sum[3]))
print("La media para GC.DOD.TOTL.GD.ZS en 2017 es {}.".format(desarrollo2017_count_sum[1]/desarrollo2017_count_sum[3]))
print("La media para SP.DYN.CBRT.IN en 2017 es {}.".format(desarrollo2017_count_sum[2]/desarrollo2017_count_sum[3]))

La media para GC.DOD.TOTL.CN en 2017 es 2239858619962.69.
La media para GC.DOD.TOTL.GD.ZS en 2017 es 48.659564238322.
La media para SP.DYN.CBRT.IN en 2017 es 34.593.


##### 8. Podrías decirme el total de deuda acumulada DT.AMT.MLAT.CD por los 10 países con un valor en media menor de SP.DYN.CBRT.IN

Se busca devolver la deuda acumulada DT.AMT.MLAT.CD de los 10 países que tienen una menor media en SP.DYN.CBRT.IN.

Por lo tanto los pasos a seguir serán obtener la media de SP.DYN.CBRT.IN de cada país y obtener los 10 países que tienen una menor media. Después se calculará el total de deuda acumulada de esos 10 países.

In [74]:
key_value_data = parsed_data_desarrollo.map(lambda x: (x[0],to_float(x[5]))) #se hace un map para hacer un key value. key es el país y el value será la variable a analizar
key_value_data.take(5)

[('Albania', 12.259),
 ('Albania', 12.197),
 ('Albania', 12.08),
 ('Australia', 13.2),
 ('Australia', 12.9)]

In [75]:
mean_by_key = key_value_data.reduceByKey(lambda x, y: x+y).\
    map(lambda x: (x[0], (x[1]/3)))
mean_by_key.collect() 

#se realiza un reduce by key para que sume por cada país la variable a analizar
#de este modo se obtiene el birth rate total del país. posteriormente se divide entre la cantidad de veces que se ha realizado la iteraccion.
#No se como realizarlo con un aggregate ya que está agrupado por países y no es sobre el total de la variable.
#se divide entre 3 ya que menos un país, el resto solo reportó datos en 3 años.

[('Hungary', 9.533333333333333),
 ('Guatemala', 25.628666666666664),
 ('Belize', 7.355666666666667),
 ('Zambia', 25.531666666666666),
 ('Ukraine', 10.6),
 ('Botswana', 26.417666666666666),
 ('Switzerland', 10.466666666666667),
 ('Uruguay', 14.127666666666668),
 ('United States', 12.366666666666665),
 ('Palau', 13.133333333333333),
 ('Russian Federation', 13.166666666666666),
 ('Jamaica', 16.63566666666667),
 ('Namibia', 10.064),
 ('St. Lucia', 4.167333333333334),
 ('Singapore', 9.633333333333333),
 ('New Zealand', 12.866666666666667),
 ('United Kingdom', 11.9),
 ('Turkey', 16.717333333333332),
 ('Seychelles', 11.333333333333334),
 ('Spain', 9.0),
 ('Moldova', 10.604333333333335),
 ('Papua New Guinea', 18.779),
 ('Jordan', 16.203333333333333),
 ('Belarus', 12.466666666666667),
 ('Japan', 7.933333333333334),
 ('Georgia', 14.056),
 ('El Salvador', 18.733999999999998),
 ('St. Vincent and the Grenadines', 5.003666666666667),
 ('Australia', 13.0),
 ('Brazil', 14.467666666666666),
 ('Barbados

In [76]:
mean_by_key.sortBy(lambda x: x[1], ascending = True).take(10) #se ordena las medias en sentido descendente y se devuelven los 10 valores más bajos.

[('St. Lucia', 4.167333333333334),
 ('St. Kitts and Nevis', 4.2),
 ('St. Vincent and the Grenadines', 5.003666666666667),
 ('San Marino', 5.366666666666667),
 ('Barbados', 7.240666666666667),
 ('Thailand', 7.278333333333333),
 ('Belize', 7.355666666666667),
 ('Philippines', 7.645),
 ('Japan', 7.933333333333334),
 ('"Korea', 8.366666666666667)]