## Create Spark Session

In [2]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}


In [3]:
val conf = new SparkConf().setAll(Map(
      "spark.scheduler.mode" -> "FIFO",
      "spark.speculation" -> "false",
      "spark.reducer.maxSizeInFlight" -> "48m",
      "spark.serializer" -> "org.apache.spark.serializer.KryoSerializer",
      "spark.kryoserializer.buffer.max" -> "1g",
      "spark.shuffle.file.buffer" -> "32k",
      "spark.default.parallelism" -> "12",
      "spark.sql.shuffle.partitions" -> "12"
    ))

conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@322e2aa


In [4]:
 val spark = SparkSession
      .builder
      .config(conf)
      .appName("TP Spark : Preprocessor")
      .getOrCreate()

import spark.implicits._  // to use the symbol $

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@21f42ccf
import spark.implicits._


## Download masterfile

In [11]:
import sys.process._
import java.net.URL
import java.io.File
import java.io.File
import java.nio.file.{Files, StandardCopyOption}
import java.net.HttpURLConnection 
import org.apache.spark.sql.functions._


def fileDownloader(urlOfFileToDownload: String, fileName: String) = {
    val url = new URL(urlOfFileToDownload)
    val connection = url.openConnection().asInstanceOf[HttpURLConnection]
    connection.setConnectTimeout(5000)
    connection.setReadTimeout(5000)
    connection.connect()

    if (connection.getResponseCode >= 400)
        println("error")
    else
        url #> new File(fileName) !!
}

import sys.process._
import java.net.URL
import java.io.File
import java.io.File
import java.nio.file.{Files, StandardCopyOption}
import java.net.HttpURLConnection
import org.apache.spark.sql.functions._
fileDownloader: (urlOfFileToDownload: String, fileName: String)Any


In [5]:
fileDownloader("http://data.gdeltproject.org/gdeltv2/masterfilelist.txt", "./data/masterfilelist.txt") // save the list file to the Spark Master

res0: Any = ""


## Download data for one day

In [5]:
import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)
val filesDF = sqlContext.read.
                    option("delimiter"," ").
                    option("infer_schema","true").
                    csv("./data/masterfilelist.txt").
                    withColumnRenamed("_c0","size").
                    withColumnRenamed("_c1","hash").
                    withColumnRenamed("_c2","url").
                    cache

import org.apache.spark.sql.SQLContext
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@3eab5610
filesDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [size: string, hash: string ... 1 more field]


In [6]:
filesDF.show(false)

+--------+--------------------------------+--------------------------------------------------------------------+
|size    |hash                            |url                                                                 |
+--------+--------------------------------+--------------------------------------------------------------------+
|150383  |297a16b493de7cf6ca809a7cc31d0b93|http://data.gdeltproject.org/gdeltv2/20150218230000.export.CSV.zip  |
|318084  |bb27f78ba45f69a17ea6ed7755e9f8ff|http://data.gdeltproject.org/gdeltv2/20150218230000.mentions.CSV.zip|
|10768507|ea8dde0beb0ba98810a92db068c0ce99|http://data.gdeltproject.org/gdeltv2/20150218230000.gkg.csv.zip     |
|149211  |2a91041d7e72b0fc6a629e2ff867b240|http://data.gdeltproject.org/gdeltv2/20150218231500.export.CSV.zip  |
|339037  |dec3f427076b716a8112b9086c342523|http://data.gdeltproject.org/gdeltv2/20150218231500.mentions.CSV.zip|
|10269336|2f1a504a3c4558694ade0442e9a5ae6f|http://data.gdeltproject.org/gdeltv2/20150218231500.g

In [7]:
val sampleDF = filesDF.filter(col("url").contains("/20181201")).cache

sampleDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [size: string, hash: string ... 1 more field]


In [8]:
sampleDF.show()

+--------+--------------------+--------------------+
|    size|                hash|                 url|
+--------+--------------------+--------------------+
|  286749|7745b74ca805d90a8...|http://data.gdelt...|
|  249784|95ea33b0393a85214...|http://data.gdelt...|
| 9694304|c0a3d85a1f3a5263b...|http://data.gdelt...|
|  250319|47317d0bd9cd56e7b...|http://data.gdelt...|
|  304863|37513cccbabc99a9e...|http://data.gdelt...|
|10119350|50c3fa1e60385a2ca...|http://data.gdelt...|
|  224825|8ca8b3a6b4ff6a960...|http://data.gdelt...|
|  298101|68369aec62706a2b5...|http://data.gdelt...|
| 9705075|8cd7aa2af725aa02b...|http://data.gdelt...|
|  191077|906e6599092855ba2...|http://data.gdelt...|
|  252312|de9048e047049b3b6...|http://data.gdelt...|
| 9370422|5a674db2a8d110d27...|http://data.gdelt...|
|  187888|cee25cd1ab712b7ac...|http://data.gdelt...|
|  274338|e12ffacb37d73d9c9...|http://data.gdelt...|
|10008236|48764cfe5bc415441...|http://data.gdelt...|
|  145719|df1010eb1a3466fef...|http://data.gde

In [9]:
sampleDF.count()

res3: Long = 288


In [12]:
sampleDF.select("url").repartition(100).foreach( r=> {
            val URL = r.getAs[String](0)
            val fileName = r.getAs[String](0).split("/").last
            val dir = "./data/"
            val localFileName = dir + fileName
            fileDownloader(URL, localFileName)
})