# Первая задача

In [None]:
from pymongo import MongoClient
import json


def connect_to_mongodb(db_name="test-database"):
    client = MongoClient()
    db = client[db_name]
    return db.person


def parse_data(file_name):
    items = []
    with open(file_name, "r", encoding="utf-8") as f:
        lines = f.readlines()
        item = dict()
        for line in lines:
            if line == "=====\n":
                items.append(item)
                item = dict()
            else:
                line = line.strip()
                splitted = line.split("::")

                if splitted[0] in ["salary", "id", "year", "age"]:
                    item[splitted[0]] = int(splitted[1])
                else:
                    item[splitted[0]] = splitted[1]

    return items


def sort_by_salary(collection):
    items = []
    for person in collection.find(limit=10).sort({"salary": -1}):
        items.append(person)
    return items


def filter_by_age(collection):
    items = []
    for person in (collection
                  .find({"age": {"$lt": 30}}, limit=15)
                  .sort({"salary": -1})):
        items.append(person)
    return items


def filter_by_city_and_job(collection):
    items = []
    for person in (collection
                  .find({"city": "Рига",
                         "job": {"$in": ["Врач", "Строитель", "Повар"]}}, limit=10)
                  .sort({"age": 1})):
        items.append(person)
    return items


def count_obj(collection):
    res = collection.count_documents({
        "age": {"$gt": 20, "$lt": 30},
        "year": {"$gte": 2019, "$lte": 2022},
        "$or": [
            {"salary": {"$gt": 50000, "$lte": 75000}},
            {"salary": {"$gt": 125000, "$lt": 150000}}
        ]
    })
    return res


data = parse_data("tasks/task_1_item.text")

conn = connect_to_mongodb()
# conn.insert_many(data)

sorted_salary = sort_by_salary(conn)
with open("results/task1_sorted_by_salary.json", "w", encoding="utf-8") as f:
    f.write(json.dumps(sorted_salary, ensure_ascii=False, default=str))

filtered_age = filter_by_age(conn)
with open("results/task1_filtered_age.json", "w", encoding="utf-8") as f:
    f.write(json.dumps(filtered_age, ensure_ascii=False, default=str))

filtered_city_job = filter_by_city_and_job(conn)
with open("results/task1_filtered_city_job.json", "w", encoding="utf-8") as f:
    f.write(json.dumps(filtered_city_job, ensure_ascii=False, default=str))

cnt_obj = count_obj(conn)
with open("results/task1_cnt_obj.json", "w", encoding="utf-8") as f:
    f.write(json.dumps(cnt_obj, ensure_ascii=False, default=str))

# Вторая задача


In [None]:
from pymongo import MongoClient
import msgpack
import json


def connect_to_mongodb(db_name="test-database"):
    client = MongoClient()
    db = client[db_name]
    return db.person


def load_data(file_name):
    with open(file_name, "rb") as f:
        byte_data = f.read()
    data = msgpack.unpackb(byte_data)
    return data


def get_stat_by_salary(collection):
    q = [
        {
            "$group": {
                "_id": "result",
                "max": {"$max": "$salary"},
                "min": {"$min": "$salary"},
                "avg": {"$avg": "$salary"}
            }
        }
    ]
    return [stat for stat in collection.aggregate(q)]


def get_freq_by_job(collection):
    q = [
        {
            "$group": {
                "_id": "$job",
                "count": {"$sum": 1}
            }
        },
        {
            "$sort": {
                "count": -1
            }
        }
    ]
    return [stat for stat in collection.aggregate(q)]


def get_stat_by_column(collection, group_name, stat_name):
    q = [
        {
            "$group": {
                "_id": f"${group_name}",
                "max": {"$max": f"${stat_name}"},
                "min": {"$min": f"${stat_name}"},
                "avg": {"$avg": f"${stat_name}"}
            }
        }
    ]
    return [stat for stat in collection.aggregate(q)]


def get_max_salary_by_min_age_match(collection):
    q = q = [
        {
            "$sort": {"age": 1,
                      "salary": -1}
        },
        {
            "$limit": 1
        },
        {
            "$project": {"_id": 1,
                         "salary": 1,
                         "age": 1}
        }
    ]

    return [stat for stat in collection.aggregate(q)]


def get_min_salary_by_max_age_match(collection):
    q = [
        {
            "$sort": {"age": -1,
                      "salary": 1}
        },
        {
            "$limit": 1
        },
        {
            "$project": {"_id": 1,
                         "salary": 1,
                         "age": 1}
        }
    ]

    return [stat for stat in collection.aggregate(q)]


def get_sorted_stat_by_condition(collection):
    q = [
        {
            "$match": {
                "salary": {"$gt": 50000}
            }
        },
        {
            "$group": {
                "_id": "$city",
                "min": {"$min": "$age"},
                "max": {"$max": "$age"},
                "avg": {"$avg": "$age"}
            }
        },
        {
            "$sort": {
                "max": -1
            }
        }
    ]
    return [stat for stat in collection.aggregate(q)]


def get_salary_stat_by_condition(collection):
    q = [
        {
            "$match": {
                "city": {"$in": ["Москва", "Прага", "Малага"]},
                "job": {"$in": ["IT-специалист", "Повар", "Учитель"]},
                "$or": [
                    {"age": {"$gt": 18, "$lt": 25}},
                    {"age": {"$gt": 50, "$lt": 65}}
                ]
            }
        },
        {
            "$group": {
                "_id": "_result",
                "min": {"$min": "$salary"},
                "max": {"$max": "$salary"},
                "avg": {"$avg": "$salary"},
            }
        }
    ]
    return [stat for stat in collection.aggregate(q)]


def get_avg_salary_by_job_in_moscow(collection):
    q = [
        {
            "$match": {"city": "Москва"}
        },
        {
            "$group": {
                "_id": "$job",
                "avg": {"$avg": "$salary"},
            }
        },
        {
            "$sort": {"avg": -1}
        }
    ]
    return [stat for stat in collection.aggregate(q)]


df = load_data("tasks/task_2_item.msgpack")
conn = connect_to_mongodb()
# conn.insert_many(df)

salary_stats = get_stat_by_salary(conn)
with open("results/task2_salary_stats.json", "w", encoding="utf-8") as f:
    f.write(json.dumps(salary_stats, ensure_ascii=False, default=str))

job_freq = get_freq_by_job(conn)
with open("results/task2_job_freq.json", "w", encoding="utf-8") as f:
    f.write(json.dumps(job_freq, ensure_ascii=False, default=str))

salary_stats_by_city = get_stat_by_column(conn, "city", "salary")
with open("results/task2_salary_stats_by_city.json", "w", encoding="utf-8") as f:
    f.write(json.dumps(salary_stats_by_city, ensure_ascii=False, default=str))

salary_stats_by_job = get_stat_by_column(conn, "job", "salary")
with open("results/task2_salary_stats_by_job.json", "w", encoding="utf-8") as f:
    f.write(json.dumps(salary_stats_by_job, ensure_ascii=False, default=str))

age_stats_by_city = get_stat_by_column(conn, "city", "age")
with open("results/task2_age_stats_by_city.json", "w", encoding="utf-8") as f:
    f.write(json.dumps(age_stats_by_city, ensure_ascii=False, default=str))

age_stats_by_job = get_stat_by_column(conn, "job", "age")
with open("results/task2_age_stats_by_job.json", "w", encoding="utf-8") as f:
    f.write(json.dumps(age_stats_by_job, ensure_ascii=False, default=str))

max_salary_min_age = get_max_salary_by_min_age_match(conn)
with open("results/task2_max_salary_min_age.json", "w", encoding="utf-8") as f:
    f.write(json.dumps(max_salary_min_age, ensure_ascii=False, default=str))

min_salary_max_age = get_min_salary_by_max_age_match(conn)
with open("results/task2_min_salary_max_age.json", "w", encoding="utf-8") as f:
    f.write(json.dumps(min_salary_max_age, ensure_ascii=False, default=str))

sorted_stats_condition = get_sorted_stat_by_condition(conn)
with open("results/task2_sorted_stats_condition.json", "w", encoding="utf-8") as f:
    f.write(json.dumps(sorted_stats_condition, ensure_ascii=False, default=str))

salary_stats_condition = get_salary_stat_by_condition(conn)
with open("results/task2_salary_stats_condition.json", "w", encoding="utf-8") as f:
    f.write(json.dumps(salary_stats_condition, ensure_ascii=False, default=str))

avg_salary_in_msc = get_avg_salary_by_job_in_moscow(conn)
with open("results/task2_avg_salary_in_msc.json", "w", encoding="utf-8") as f:
    f.write(json.dumps(avg_salary_in_msc, ensure_ascii=False, default=str))

# Третья задача

In [None]:
from pymongo import MongoClient
import pickle
import json


def connect_to_mongodb(db_name="test-database"):
    client = MongoClient()
    db = client[db_name]
    return db.person


def load_pkl_data(path):
    with open(path, "rb") as input:
        items = pickle.load(input)
    return items


def delete_by_salary(collection):
    result = collection.delete_many({
        "$or": [
            {"salary": {"$lt": 25000}},
            {"salary": {"$gt": 175000}}
        ]
    })

    print(result)


def update_age(collection):
    result = collection.update_many({}, {
        "$inc": {
            "age": 1
        }
    })

    print(result)


def increase_salary_by_job(collection):
    filter = {
        "job": {"$in": ["Учитель", "Косметолог", "Медсестра", "IT-специалист"]}
    }

    update = {
        "$mul": {
            "salary": 1.05
        }
    }

    result = collection.update_many(filter, update)
    print(result)


def increase_salary_by_city(collection):
    filter = {
        "city": {"$in": ["Москва", "Санкт-Петербург", "Барселона", "Гранада"]}
    }

    update = {
        "$mul": {
            "salary": 1.07
        }
    }

    result = collection.update_many(filter, update)
    print(result)


def increase_salary_by_many_conditions(collection):
    filter = {
        "city": {"$in": ["Москва", "Санкт-Петербург", "Барселона", "Гранада"]},
        "job": {"$in": ["Учитель", "Косметолог", "Медсестра", "IT-специалист"]},
        "age": {"$gt": 20, "$lt": 50},
    }

    update = {
        "$mul": {
            "salary": 1.1
        }
    }

    result = collection.update_many(filter, update)
    print(result)


def delete_by_many_conditions(collection):
    result = collection.delete_many({
        "year": {"$lt": 2000},
        "$or": [
            {"age": {"$lt": 18}},
            {"age": {"$gt": 55}}
        ]
    })

    print(result)


data = load_pkl_data("tasks/task_3_item.pkl")
conn = connect_to_mongodb()
# conn.insert_many(data)

delete_by_salary(conn)
increase_salary_by_job(conn)
increase_salary_by_city(conn)
increase_salary_by_many_conditions(conn)
delete_by_many_conditions(conn)

# Четвёртая задача

In [None]:
from pymongo import MongoClient
import json
import pandas as pd


def load_csv(filename):
    df = pd.read_csv(filename).to_dict(orient='records')
    return df


def load_json(filename):
    with open(filename, "r", encoding="utf-8") as input:
        data = json.load(input)
    return data


def save_json(path, file):
    with open(path, "w", encoding="utf-8") as f:
        f.write(json.dumps(file, ensure_ascii=False, default=str))


def connect_to_mongodb(db_name="test-database"):
    client = MongoClient()
    conn = client[db_name]
    return conn


# выборка
def sort_by_quality(collection):
    items = []
    for wine in collection.find(limit=10).sort({"quality": -1}):
        items.append(wine)
    return items


def get_quality_above_7(collection):
    items = []
    for wine in collection.find({"quality": {"$gte": 7}}).sort({"quality": -1}):
        items.append(wine)
    return items


def sort_by_alco(collection):
    items = []
    for wine in collection.find(limit=30).sort({"alcohol": 1}):
        items.append(wine)
    return items


def get_bad_red_wines(collection):
    items = []
    for wine in collection.find({"quality": {"$lte": 3}, "type": "red"}).sort({"quality": 1}):
        items.append(wine)
    return items


def get_sweet_white_wines(collection):
    items = []
    for wine in collection.find({"type": "white"}, limit=15).sort({"residual sugar": -1}):
        items.append(wine)
    return items


# выборка с агрегацией
def get_freq_by_type(collection):
    q = [
        {
            "$group": {
                "_id": "$type",
                "count": {"$sum": 1}
            }
        }
    ]
    return [stat for stat in collection.aggregate(q)]


def get_stat_by_column(collection, group_name, stat_name):
    q = [
        {
            "$group": {
                "_id": f"${group_name}",
                "max": {"$max": f"${stat_name}"},
                "min": {"$min": f"${stat_name}"},
                "avg": {"$avg": f"${stat_name}"}
            }
        }
    ]
    return [stat for stat in collection.aggregate(q)]


def get_ph_stat_by_condition(collection):
    q = [
        {
            "$match": {
                "quality": {"$gte": 4, "$lte": 6}
            }
        },
        {
            "$group": {
                "_id": "$type",
                "min": {"$min": "$pH"},
                "max": {"$max": "$pH"},
                "avg": {"$avg": "$pH"},
            }
        }
    ]
    return [stat for stat in collection.aggregate(q)]


# обновление/удаление данных
def increase_sulphates_by_type(collection):
    filter = {
        "type": "white"
    }

    update = {
        "$mul": {
            "sulphates": 1.02
        }
    }

    result = collection.update_many(filter, update)
    print(result)


def update_fixed_acidity(collection):
    result = collection.update_many({}, {
        "$inc": {
            "fixed acidity": 0.1
        }
    })

    print(result)


def delete_by_ph(collection):
    result = collection.delete_many({
        "$or": [
            {"pH": {"$lte": 2.9}},
            {"pH": {"$gte": 3.5}}
        ]
    })

    print(result)


def increase_alco_by_density(collection):
    filter = {
        "density": {"$gte": 0.995}
    }

    update = {
        "$mul": {
            "alcohol": 1.01
        }
    }

    result = collection.update_many(filter, update)
    print(result)


def delete_by_quality(collection):
    result = collection.delete_many({
        "quality": {"$lte": 3}
    })

    print(result)


white_df = load_csv("tasks/white_wine.csv")
red_df = load_json("tasks/red_wine.json")
db = connect_to_mongodb()
# db.wines.insert_many(white_df)
# db.wines.insert_many(red_df)

top_wines = sort_by_quality(db.wines)
save_json("results/task4_top_wines.json", top_wines)
filtered_wines = get_quality_above_7(db.wines)
save_json("results/task4_filtered_wines.json", filtered_wines)
sorted_by_alco = sort_by_alco(db.wines)
save_json("results/task4_sorted_by_alco.json", sorted_by_alco)
bad_red_wines = get_bad_red_wines(db.wines)
save_json("results/task4_bad_red_wines.json", bad_red_wines)
sweet_white_wines = get_sweet_white_wines(db.wines)
save_json("results/task4_sweet_white_wines.json", sweet_white_wines)

type_freq = get_freq_by_type(db.wines)
save_json("results/task4_type_freq.json", type_freq)
stat_by_quality = get_stat_by_column(db.wines, "type", "quality")
save_json("results/task4_stat_by_quality.json", stat_by_quality)
stat_by_alcohol = get_stat_by_column(db.wines, "type", "alcohol")
save_json("results/task4_stat_by_alcohol.json", stat_by_alcohol)
stat_by_sugar = get_stat_by_column(db.wines, "type", "residual sugar")
save_json("results/task4_stat_by_sugar.json", stat_by_sugar)
ph_stat = get_ph_stat_by_condition(db.wines)
save_json("results/task4_ph_stat.json", ph_stat)

increase_sulphates_by_type(db.wines)
update_fixed_acidity(db.wines)
delete_by_ph(db.wines)
increase_alco_by_density(db.wines)
delete_by_quality(db.wines)