En esta notebook explico como analizar el dataset Spark Fund Investment Analysis utilizando como entorno de desarrollo **google Colab** y como Framework **Spark**.

*Aclaración*: El código a continuación fue escrito pensando en ser ejecutado en un entorno de google Colab.

In [None]:
#Instalamos kaggle
!pip install kaggle

In [None]:
#Importamos drive
from google.colab import drive
drive.mount("/content/drive")

In [None]:
#Creamos un directorio
!mkdir ~/.kaggle

In [None]:
! cp /content/drive/MyDrive/KaggleAPI/kaggle.json ~/.kaggle/kaggle.json

In [None]:
#Damos permisos de escritura y lectura al archivo .json
! chmod 600 ~/.kaggle/kaggle.json

In [None]:
#Descargamos el dataset
! kaggle datasets download pranay969/spark-fund-investment-analysis

In [None]:
#Deszipeamos el dataset
! unzip spark-fund-investment-analysis.zip

In [None]:
#Instalamos Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

#Descargamos Spark
!wget -q https://downloads.apache.org/spark/spark-3.2.3/spark-3.2.3-bin-hadoop3.2.tgz
#Dezipeamos el archivo descargado
!tar xf spark-3.2.3-bin-hadoop3.2.tgz
#verificamos la versión de spark que tenemos
!spark-submit --version

#Utilizamos la versión de spark para saber que versión de pyspark instalar
!pip install pyspark==3.2.3 #Cambiar a la versión de spark 
#Verificamos que la versión de pyspark sea la correcta
!pyspark --version

#Instalación de Spark en Python
!pip install -q findspark

#Configuración de variables de entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.3-bin-hadoop3.2"

#Importamos SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
#Verificación de la sesión de Spark
spark

In [None]:
#Importamos las librerias a utilizar
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import IntegerType, StringType, DoubleType

In [None]:
#Importamos pandas
import pandas as pd

In [None]:
#Leemos el archivo rounds2.csv
dfRounds2 = spark.read.format("csv").option("header", "true").option("delimiter", ",").load("/content/datasets/rounds2.csv")

dfRounds2.show(truncate=False)

dfRounds2.printSchema()

**Verificamos que todos los valores de la columna raised_amount_usd sean enteros**

In [None]:
#Definimos el esquema apropiado para el archivo
dfRounds2 = spark.read.format("csv").option("header", "true").option("delimiter", ",").schema(
    StructType(
        [
            StructField("company_permalink", StringType(), True),
            StructField("funding_round_permalink", StringType(), True),
            StructField("funding_round_type", StringType(), True),
            StructField("funding_round_code", StringType(), True),
            StructField("funded_at", StringType(), True),
            StructField("raised_amount_usd", DoubleType(), True)
        ]
    )
).load("/content/datasets/rounds2.csv")

dfRounds2.show()

dfRounds2.printSchema()

dfRounds2.count()

In [None]:
#Limpiamos el archivo
dfRounds2Limpio = dfRounds2.filter(
    (dfRounds2['company_permalink'].isNotNull()) &
    (dfRounds2['funding_round_permalink'].isNotNull()) &
    (dfRounds2['funding_round_type'].isNotNull()) &
    (dfRounds2['funding_round_code'].isNotNull()) &
    (dfRounds2['funded_at'].isNotNull()) &
    (dfRounds2['raised_amount_usd'].isNotNull()) &
    (dfRounds2['raised_amount_usd'] > 0) 
).distinct()

dfRounds2Limpio.show()
dfRounds2Limpio.count()

In [None]:
#Leemos el archivo mapping.csv
dfMapping = spark.read.format("csv").option("header", "true").option("delimiter", ",").load("/content/datasets/mapping.csv")

dfMapping.show()

dfMapping.printSchema()

In [None]:
#Definimos el esquema apropiado
dfMapping = spark.read.format("csv").option("header", "true").option("delimiter", ",").schema(
    StructType(
        [
            StructField("category_list", StringType(), True),
            StructField("Automotive & Sports", IntegerType(), True),  
            StructField("Blanks", IntegerType(), True),
            StructField("Cleantech / Semiconductors", IntegerType(), True),
            StructField("Entertainment", IntegerType(), True),
            StructField("Health", IntegerType(), True),
            StructField("Manufacturing", IntegerType(), True),
            StructField("News, Search and Messaging", IntegerType(), True),
            StructField("Others", IntegerType(), True),
            StructField("Social, Finance, Analytics, Advertising", IntegerType(), True),
        ]
    )
).load("/content/datasets/mapping.csv")

dfMapping.show()

dfMapping.printSchema()

In [None]:
#Limpiamos el archivo
dfMappingLimpio = dfMapping.filter(
    (dfMapping["category_list"].isNotNull()) &
    (dfMapping["Automotive & Sports"].isNotNull()) &
    (dfMapping["Blanks"].isNotNull()) &
    (dfMapping["Cleantech / Semiconductors"].isNotNull()) &
    (dfMapping["Entertainment"].isNotNull()) &
    (dfMapping["Health"].isNotNull()) &
    (dfMapping["Manufacturing"].isNotNull()) &
    (dfMapping["News, Search and Messaging"].isNotNull()) &
    (dfMapping["Others"].isNotNull()) &
    (dfMapping["Social, Finance, Analytics, Advertising"].isNotNull())
).distinct()

dfMappingLimpio.show()

dfMappingLimpio.printSchema()

In [None]:
#Leemos el archivo companies.txt
dfCompanies = spark.read.format("csv").option("header", "true").option("delimiter", "	").load("/content/datasets/companies.txt")

dfCompanies.show()

dfCompanies.printSchema()

In [None]:
#Definimos la lista de paises que tienen como idioma oficial el inglés
list_of_countries = ["KEN", "NGA", "GHA", "MWI", "ZMB", "FJI", "ZAF", "SGP", "PNG", "IRL", "DMA", "LCA", "ATG", 
                     "BHS", "BRB", "BLZ", "GRD", "GUY", "JAM", "KNA", "VCT", "SLE", "TTO", "CAN", "BWA", "GMB",
                     "LBR", "MUS", "FSM", "SLB", "SSD"]

In [None]:
#Limpiamos el archivo con las condiciones para analizar
dfCompaniesLimpio = dfCompanies.filter(
    (dfCompanies["name"].isNotNull()) &
    (dfCompanies["country_code"].isNotNull()) &
    (dfCompanies["country_code"].isin(list_of_countries))
)

dfCompaniesLimpio.show()

dfCompaniesLimpio.printSchema()