# Sprawozdanie 6 - akwizycja danych

## Środowisko

Mamy maszynę wirtualną z Ubuntu postawioną za pomocą Vagrant'a (korzystającego pod spodem z VirtualBox'a). Na tej maszynie wirtualnej stawiamy kontenery Docker'a.

Aby ułatwić sobie późniejszą pracę z obrazami na których postawiony jest hadoop postanowiliśmy dodać do master-node volumen na dane (modyfikując skrypty generujące docker-compose). Dzięki temu możemy w wygodny sposób (tj. poprzez wrzucenie do odpowiedniego folderu) przenosić pliki do miejsca, do którego możemy się dostać z poziomu maszyny z hadoopem. Warto zwrócić uwagę, że maszyna wirtualna także posiada taki wolumen, który zapewnia wykorzystanie Vagrant'a.

```yaml
master:
    image: hjben/hadoop-eco:$hadoop_version
    hostname: master
    container_name: master
    privileged: true
    ports:
      - 8088:8088
      - 9870:9870
      - 8042:8042
      - 10000:10000
      - 10002:10002
      - 16010:16010
    volumes:
      - /sys/fs/cgroup:/sys/fs/cgroup
      - $hdfs_path:/data/hadoop
      - $hadoop_log_path:/usr/local/hadoop/logs
      - $hbase_log_path/master:/usr/local/hbase/logs
      - $hive_log_path:/usr/local/hive/logs
      - $sqoop_log_path:/usr/local/sqoop/logs
      - /vagrant/master_volume:/data/master_volume <-------------- dodany volumen
    networks:
      hadoop-cluster:
        ipv4_address: 10.1.2.3
    extra_hosts:
      - "mariadb:10.1.2.2"
      - "master:10.1.2.3"
```

## Pobieranie danych
Niestety przez potrzebę generowania i podania klucza API do serwisu kaggle przed pobraniem danych należy wykonać kilka czynności.

1. Pobrać ze strony kaggle klucz API (kaggle.json)
2. Stworzyć folder .kaggle w głównym katalogu użytkownika i skopiować tam klucz API
3. Poniższe funkcje:
    1. Pobierają z kaggle:
       * [YouTube Trending Video Dataset](https://www.kaggle.com/datasets/rsrishav/youtube-trending-video-dataset)
       * [Steam Dataset](https://www.kaggle.com/datasets/souyama/steam-dataset)
    2. Pobierają z sieci [dane Covid'owe](https://covid.ourworldindata.org/data/owid-covid-data.csv)

In [1]:
%pip install docker
%pip install json
%pip install opendatasets 
%pip install csv
%pip install pandas
%pip install paramiko

Collecting docker
  Downloading docker-6.1.2-py3-none-any.whl (148 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m148.1/148.1 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Installing collected packages: docker
Successfully installed docker-6.1.2
Note: you may need to restart the kernel to use updated packages.
[31mERROR: Could not find a version that satisfies the requirement json (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for json[0m[31m
[0mNote: you may need to restart the kernel to use updated packages.
Collecting opendatasets
  Downloading opendatasets-0.1.22-py3-none-any.whl (15 kB)
Collecting kaggle (from opendatasets)
  Downloading kaggle-1.5.13.tar.gz (63 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.3/63.3 kB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting click (from opendatasets)
  Downloading

In [7]:
import os
from timeit import default_timer as timer
import requests
import docker
import json
import opendatasets as od
import csv
import pandas

In [8]:
if os.getcwd().startswith("/tmp"):
    os.chdir("/vagrant/sprawozdania/akwizycja")

In [9]:
output_dir = "../../master_volume/datasets"

In [10]:
%%timeit -r 1 -n 1
if not os.path.isdir(f"{output_dir}/youtube-trending-video-dataset"):
    od.download("https://www.kaggle.com/datasets/rsrishav/youtube-trending-video-dataset", f"{output_dir}")
else:
    print("Dataset youtube-trending-video-dataset already exists, skipping download")

Please provide your Kaggle credentials to download this dataset. Learn more: http://bit.ly/kaggle-creds
Your Kaggle username: a
Your Kaggle Key: ········
Your Kaggle Key: ········
Your Kaggle Key: ········


ApiException: (401)
Reason: Unauthorized
HTTP response headers: HTTPHeaderDict({'Content-Length': '0', 'Date': 'Mon, 15 May 2023 20:39:59 GMT', 'Access-Control-Allow-Credentials': 'true', 'Set-Cookie': 'ka_sessionid=8ae67d70a25e1a5fb437b41b50b4cae9; max-age=2626560; path=/, GCLB=CKPJ8cmF_6j40gE; path=/; HttpOnly', 'Turbolinks-Location': 'https://www.kaggle.com/api/v1/datasets/download/rsrishav/youtube-trending-video-dataset', 'Strict-Transport-Security': 'max-age=63072000; includeSubDomains; preload', 'Content-Security-Policy': "object-src 'none'; script-src 'nonce-B5bKt1Hyj7WdPErM/Fy9Nw==' 'report-sample' 'unsafe-inline' 'unsafe-eval' 'strict-dynamic' https: http:; frame-src 'self' https://www.kaggleusercontent.com https://www.youtube.com/embed/ https://polygraph-cool.github.io https://www.google.com/recaptcha/ https://form.jotform.com https://submit.jotform.us https://submit.jotformpro.com https://submit.jotform.com https://www.docdroid.com https://www.docdroid.net https://kaggle-static.storage.googleapis.com https://kaggle-static-staging.storage.googleapis.com https://kkb-dev.jupyter-proxy.kaggle.net https://kkb-staging.jupyter-proxy.kaggle.net https://kkb-production.jupyter-proxy.kaggle.net https://kkb-dev.firebaseapp.com https://kkb-staging.firebaseapp.com https://kkb-production.firebaseapp.com https://kaggle-metastore-test.firebaseapp.com https://kaggle-metastore.firebaseapp.com https://apis.google.com https://content-sheets.googleapis.com/ https://accounts.google.com/ https://storage.googleapis.com https://docs.google.com https://drive.google.com https://calendar.google.com/; base-uri 'none'; report-uri https://csp.withgoogle.com/csp/kaggle/20201130;", 'X-Content-Type-Options': 'nosniff', 'Referrer-Policy': 'strict-origin-when-cross-origin', 'Via': '1.1 google', 'Alt-Svc': 'h3=":443"; ma=2592000,h3-29=":443"; ma=2592000'})


In [5]:
%%timeit -r 1 -n 1
if not os.path.isdir(f"{output_dir}/steam-dataset"):
    od.download("https://www.kaggle.com/datasets/souyama/steam-dataset", f"{output_dir}")
else:
    print("Dataset steam-dataset already exists, skipping download")

Dataset steam-dataset already exists, skipping download
465 µs ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [6]:
%%timeit -r 1 -n 1
path = f"{output_dir}/covid-dataset.csv"

if not os.path.isfile(path):
    print(f"Downloading covid-dataset to {path}")
    start = timer()
    r = requests.get("https://covid.ourworldindata.org/data/owid-covid-data.csv", allow_redirects=True)
    with open(path, 'wb') as file:
        file.write(r.content)
    end = timer()
    print(f"Download finished in {end - start:.02f}s")
else:
    print("Dataset covid-dataset already exists, skipping download")

print(f"covid-dataset.csv: {os.stat(path).st_size / (1024 * 1024):.02f}MB")

Dataset covid-dataset already exists, skipping download
covid-dataset.csv: 77.76MB
871 µs ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


## Formatowanie danych
Po rozpakowaniu danych widać, że część z nich ma format trudny do późniejszej pracy. Ostatecznie postanowiliśmy przed wrzuceniem plików do hdfs wszystkie przetransformować do dormatu .jsonl. Format .jsonl zawiera obiekty json, każdy w kolejnej linii. Dzięki zastosowaniu takiego formatu łatwo będzie można implementować procesy map-reduce.


In [7]:
%%timeit -r 1 -n 1

print("Converting CSV to JSONL")
for root, directories, files in os.walk(f"{output_dir}"):
        for filename in files:
            path = os.path.join(root,filename)
            output_path = path.replace(".csv", ".jsonl")

            if os.path.isfile(output_path):
                # os.remove(output_path)
                continue

            if path.endswith(".csv"):
                try:
                    print(path)
                    with open(path, 'r', encoding='utf-8', errors='replace') as infile, open(output_path, 'w', encoding='utf-8', errors='replace') as outfile:
                        reader = csv.reader(x.replace('\0', '') for x in infile)
                        headers = next(reader)
                        data = list(reader)
                        df = pandas.DataFrame(data, columns=headers)
                        for row in df.to_dict('records'):
                            json.dump(row, outfile)
                            outfile.write('\n')
                    print(output_path)
                except Exception as e:
                    print(e)

Converting CSV to JSONL
8.41 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [8]:
%%timeit -r 1 -n 1

print("Converting JSON to JSONL")
for root, directories, files in os.walk(f"{output_dir}"):
        for filename in files:
            path = os.path.join(root,filename)
            output_path = f"{path}l"

            if os.path.isfile(output_path):
                continue

            if path.endswith(".json"):
                print(path)
                with open(path, "r") as file:
                    data = json.load(file)
                    if type(data) is dict:
                        data = [{"key": key, "value": data[key]} for key in data]

                    with open(output_path, "w") as jsonl_file:
                        for obj in data:
                            json.dump(obj, jsonl_file)
                            jsonl_file.write("\n")
                    print(output_path)

print("Done")

Converting JSON to JSONL
Done
8.34 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


## Dodanie plików do hdfs

In [12]:
client = docker.from_env()
#container = client.containers.get('namenode')

def hdfs_mkdir(path):
    container.exec_run(f"hdfs dfs -mkdir -p /{path}/")

def hdfs_upload(path):
    directory = "/".join(path.split("/")[:-1])
    hdfs_mkdir(directory)
    cmd = f"hdfs dfs -put /data/master_volume/{path} /{directory}"
    print(cmd)
    code, output = container.exec_run(cmd)
    print(f"exit code {code}")
    print(output)

DockerException: Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))

In [None]:
%%timeit -r 1 -n 1

print("Uploading to HDFS")

for root, directories, files in os.walk(f"{output_dir}"):
        for filename in files:
            if filename.endswith(".json"):
                continue # skip JSON files, we have JSONL from previous step
            filepath = path = os.path\
                .join(root,filename)

            path = filepath\
                .replace("../../master_volume/", "")\
                .replace("\\", "/")

            start_single = timer()
            hdfs_upload(path)
            end_single = timer()
            print(f"HDFS upload of {os.stat(filepath).st_size / (1024 * 1024):.02f}MB took {end_single - start_single:.02f}s")

In [None]:
def hdfs_set_replication_level(number):
    container.exec_run(f"hdfs dfs -setrep -R {number} /")

hdfs_set_replication_level(3)

## Akwizycja danych zmiennych

Nasz proces zakłada, że na podstawie części danych niezmiennych dotyczących gier z serwisu Steam (steam_dataset) wytypujemy te gry, o których liczby graczy na przestrzeni czasu będziemy pytać SteamCharts API za pomocą bardzo prostego zapytania
```
https://steamcharts.com/app/<appid>/chart-data.json
```

W celu akwizycji tych danych przygotowaliśmy proces map-reduce, który przyjmuje na wejściu potrzebne id i zwraca wyniki zapytania w odpowiedniej postaci (zgodnie z poniższą ilustracją). Na dzień dzisiejszy wybór odpowiednich gier jest symulowany (wybierane jest po prostu pierwsze 20). Pierwszy podproces, który agreguje dane został poprawnie zaimplementowany jako proces map-reduce.

![](images/SteamChartsBig.png)

### Agregacja danych

In [None]:
client = docker.from_env()
container = client.containers.get('master')

res1 = container.exec_run(("yarn jar /data/master_volume/map_reduce_jars/1.jar "
"/datasets/steam-dataset/steam_dataset/appinfo/store_data/steam_store_data.jsonl "
"/datasets/steam-dataset/steam_dataset/steamspy/basic/steam_spy_scrap.jsonl "
"/out_steam_1"))

res1

In [None]:
raw = container.exec_run("hdfs dfs -cat /out_steam_1/part-r-00000").output.decode('utf-8')
print(f"{raw[0:10000]}")

In [None]:
res2 = container.exec_run(("yarn jar /data/master_volume/map_reduce_jars/2.jar "
"/out_steam_1/part-r-00000 "
"/out_steam_2"))

res2

In [None]:
raw = container.exec_run("hdfs dfs -cat /out_steam_2/part-r-00000").output.decode('utf-8')
print(f"{raw[0:10000]}")

In [None]:
res3 = container.exec_run(("yarn jar /data/master_volume/map_reduce_jars/3.jar "
"/out_steam_2/part-r-00000 "
"/out_steam_3"))

res3

In [None]:
raw = container.exec_run("hdfs dfs -cat /out_steam_3/part-r-00000").output.decode('utf-8')
print(f"{raw[0:1000000]}")

### Akwizycja danych

## Snippety kodu map-reduce

Jak zapewne łatwo zauważyć, poniższy kod napisany jest w języku scala. Największą rzeczą jaką trzeba było zrobić, aby dostosować map-reduce do scali było napisanie funkcji konwertującej iterator w reduce. Iterator ten nie jest zgodny z API, co powodowało konieczność wcześniejszej konwersji jego ArrayList.

```scala
  class MyMapper extends HadoopJob.HadoopMapper[AnyRef, Text, Text, Text] {
    override def myMap(key: AnyRef, value: Text, emit: (Text, Text) => Unit): Unit = {
      val jsons  = value.toString.split("\n").map(x => x.dropWhile(!_.isWhitespace)).map(_.trim).toList
      val mapped = jsons.flatMap(x => Input.decoder.decodeJson(x).toOption)

      def downloadTimestamps(id: Int) = for {
        res  <- zio.http.Client.request(f"""https://steamcharts.com/app/$id/chart-data.json""")
        data <- res.body.asString
        json  = data.fromJson[ApiResult].getOrElse(List.empty)
      } yield json

      val workflow = ZIO.foreach(mapped)(x => downloadTimestamps(x.game_id).map(res => (x, res)))

      val result = runZIO(
        workflow
          .tapError(err => { ZIO.succeed(emit(Text("error!"), Text(err.getMessage))) })
          .orElse(ZIO.succeed(List.empty))
          .provide(zio.http.Client.default)
      ).map(x => Result(x._1.game_id, x._2.map(y => PlayCount(y.head, y.last))))

      result.foreach { x =>
        emit(new Text(x.game_id.toString), Text(x.playcounts.toJson))
      }
    }
  }

  class MyReducer extends HadoopJob.HadoopReducer[Text, Text, Text] {
    override def myReduce(key: Text, values: List[String], emit: (Text, Text) => Unit): Unit = {
      emit(key, Text(values.toString()))
    }
  }

  def main(args: Array[String]) = {

    java.lang.System.setProperty("java.net.preferIPv4Stack", "true")

    val conf = new Configuration
    val job  = Job.getInstance(conf, "word count")

    job.setJarByClass(classOf[Main.type])
    job.setMapperClass(classOf[MyMapper])
    job.setReducerClass(classOf[MyReducer])
    job.setOutputKeyClass(classOf[Text])
    job.setOutputValueClass(classOf[Text])

    FileInputFormat.addInputPath(job, new Path(args(0)))
    FileOutputFormat.setOutputPath(job, new Path(args(1)))

    java.lang.System.exit(
      if (job.waitForCompletion(true)) 0
      else 1
    )
  }
```